Author: kamrul
Date: Fri Oct 26 21:34:03 2012
New Revision: 1402673
URL: http://svn.apache.org/viewvc?rev=1402673&view=rev
Log:
OOZIE-1033 Generic utility class to register/unregister a JMS message
handler(Mohammad)
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestMessageReceiver.java
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java?rev=1402673&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
(added)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
Fri Oct 26 21:34:03 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import javax.jms.Message;
+
+public interface MessageHandler {
+ /**
+ * Process a generic JMS message.
+ *
+ * @param msg : to be processed
+ */
+ public void process(Message msg);
+}
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java?rev=1402673&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
(added)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
Fri Oct 26 21:34:03 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.XLog;
+
+public class MessageReceiver implements MessageListener {
+ private MessageConsumer consumer;
+ private MessageHandler msgHandler;
+ private static XLog LOG;
+
+ public MessageReceiver(MessageHandler handler) {
+ LOG = XLog.getLog(getClass());
+ this.msgHandler = handler;
+ }
+
+ /**
+ * Register a JMS message listener for a specific topic and default end
+ * point
+ *
+ * @param topicName : topic name
+ * @throws JMSException
+ */
+ public void registerTopic(String topicName) throws JMSException {
+ registerTopic(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, topicName);
+ }
+
+ /**
+ * Register a JMS message listener for a specific topic and end point
+ *
+ * @param endPoint : Service end-point (preferably HCatalog server address)
+ * to determine the JMS connection properties
+ * @param topicName
+ * @throws JMSException
+ */
+ public void registerTopic(String endPoint, String topicName) throws
JMSException {
+ consumer = Services.get().get(JMSAccessorService.class)
+
.getMessageConsumer(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, topicName);
+ consumer.setMessageListener(this);
+ LOG.info(" Listener registered for end point ." + endPoint + " topic
:" + topicName);
+ }
+
+ /**
+ * Unregister and close an existing session with default endpoint
+ *
+ * @param topicName : name of a topic
+ * @throws JMSException
+ */
+ public void unRegisterTopic(String topicName) throws JMSException {
+ unRegisterTopic(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, topicName);
+ }
+
+ /**
+ * Unregister and close an existing session with default endpoint
+ *
+ * @param endPoint : Service end-point (preferably HCatalog server address)
+ * to determine
+ * @param topicName : name of a topic
+ * @throws JMSException
+ */
+ public void unRegisterTopic(String endPoint, String topicName) throws
JMSException {
+ Services.get().get(JMSAccessorService.class).removeSession(endPoint,
topicName);
+ LOG.info("Unregister endPoint :" + endPoint + " topic :" + topicName);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+ */
+ @Override
+ public synchronized void onMessage(Message msg) {
+ if (msgHandler != null) {
+ msgHandler.process(msg);
+ }
+ else {
+ LOG.info("Message handler none. Unprocessed messsage " + msg);
+ }
+ }
+
+ @Override
+ public void finalize() {
+ // Close the session during finalizing
+ if (consumer != null) {
+ try {
+ consumer.close();
+ }
+ catch (JMSException e) {
+ LOG.warn("Unable to close the consumer ", e);
+ }
+ }
+ }
+}
Added:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestMessageReceiver.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestMessageReceiver.java?rev=1402673&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestMessageReceiver.java
(added)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestMessageReceiver.java
Fri Oct 26 21:34:03 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import java.util.Arrays;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.oozie.jms.MessageReceiver;
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XTestCase;
+
+public class TestMessageReceiver extends XTestCase {
+ private Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ Configuration conf = services.getConf();
+ conf.set(Services.CONF_SERVICE_CLASSES,
+ StringUtils.join(",",
Arrays.asList(JMSAccessorService.class.getName())));
+ conf.set(
+ JMSAccessorService.JMS_CONNECTIONS_PROPERTIES,
+
"default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false,");
+ services.init();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testMessage() throws Exception {
+ String topicName = "test-topic";
+ MessageReceiver recvr = new MessageReceiver(new MessageHandler() {
+ public void process(Message msg) {
+ if (msg instanceof ObjectMessage) {
+ System.out.println("Object Message: " + msg);
+ }
+ else if (msg instanceof MapMessage) {
+ System.out.println("MapMessage : " + msg);
+ }
+ else {
+ try {
+ System.out.println("Unexpected message type " +
msg.getJMSType());
+ }
+ catch (JMSException e) {
+ System.out.println("Unable to read the type " + e);
+ }
+ }
+ }
+ });
+ recvr.registerTopic(topicName);
+ Thread.sleep(1000);
+ recvr.unRegisterTopic(topicName);
+ }
+
+}