Author: kamrul
Date: Fri Oct 26 21:34:03 2012
New Revision: 1402673

URL: http://svn.apache.org/viewvc?rev=1402673&view=rev
Log:
OOZIE-1033 Generic utility class to register/unregister a JMS message 
handler(Mohammad)

Added:
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
    oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestMessageReceiver.java

Added: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java?rev=1402673&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageHandler.java
 Fri Oct 26 21:34:03 2012
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import javax.jms.Message;
+
+public interface MessageHandler {
+    /**
+     * Process a generic JMS message.
+     *
+     * @param msg : to be processed
+     */
+    public void process(Message msg);
+}

Added: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java?rev=1402673&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
 Fri Oct 26 21:34:03 2012
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.XLog;
+
+public class MessageReceiver implements MessageListener {
+    private MessageConsumer consumer;
+    private MessageHandler msgHandler;
+    private static XLog LOG;
+
+    public MessageReceiver(MessageHandler handler) {
+        LOG = XLog.getLog(getClass());
+        this.msgHandler = handler;
+    }
+
+    /**
+     * Register a JMS message listener for a specific topic and default end
+     * point
+     *
+     * @param topicName : topic name
+     * @throws JMSException
+     */
+    public void registerTopic(String topicName) throws JMSException {
+        registerTopic(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, topicName);
+    }
+
+    /**
+     * Register a JMS message listener for a specific topic and end point
+     *
+     * @param endPoint : Service end-point (preferably HCatalog server address)
+     *        to determine the JMS connection properties
+     * @param topicName
+     * @throws JMSException
+     */
+    public void registerTopic(String endPoint, String topicName) throws 
JMSException {
+        consumer = Services.get().get(JMSAccessorService.class)
+                
.getMessageConsumer(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, topicName);
+        consumer.setMessageListener(this);
+        LOG.info(" Listener registered for end point ." + endPoint + " topic 
:" + topicName);
+    }
+
+    /**
+     * Unregister and close an existing session with default endpoint
+     *
+     * @param topicName : name of a topic
+     * @throws JMSException
+     */
+    public void unRegisterTopic(String topicName) throws JMSException {
+        unRegisterTopic(JMSAccessorService.DEFAULT_SERVER_ENDPOINT, topicName);
+    }
+
+    /**
+     * Unregister and close an existing session with default endpoint
+     *
+     * @param endPoint : Service end-point (preferably HCatalog server address)
+     *        to determine
+     * @param topicName : name of a topic
+     * @throws JMSException
+     */
+    public void unRegisterTopic(String endPoint, String topicName) throws 
JMSException {
+        Services.get().get(JMSAccessorService.class).removeSession(endPoint, 
topicName);
+        LOG.info("Unregister endPoint :" + endPoint + " topic :" + topicName);
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
+     */
+    @Override
+    public synchronized void onMessage(Message msg) {
+        if (msgHandler != null) {
+            msgHandler.process(msg);
+        }
+        else {
+            LOG.info("Message handler none. Unprocessed messsage " + msg);
+        }
+    }
+
+    @Override
+    public void finalize() {
+        // Close the session during finalizing
+        if (consumer != null) {
+            try {
+                consumer.close();
+            }
+            catch (JMSException e) {
+                LOG.warn("Unable to close the consumer ", e);
+            }
+        }
+    }
+}

Added: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestMessageReceiver.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestMessageReceiver.java?rev=1402673&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestMessageReceiver.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/jms/TestMessageReceiver.java
 Fri Oct 26 21:34:03 2012
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.jms;
+
+import java.util.Arrays;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.oozie.jms.MessageReceiver;
+import org.apache.oozie.service.JMSAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XTestCase;
+
+public class TestMessageReceiver extends XTestCase {
+    private Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        Configuration conf = services.getConf();
+        conf.set(Services.CONF_SERVICE_CLASSES,
+                StringUtils.join(",", 
Arrays.asList(JMSAccessorService.class.getName())));
+        conf.set(
+                JMSAccessorService.JMS_CONNECTIONS_PROPERTIES,
+                
"default=java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#vm://localhost?broker.persistent=false,");
+        services.init();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testMessage() throws Exception {
+        String topicName = "test-topic";
+        MessageReceiver recvr = new MessageReceiver(new MessageHandler() {
+            public void process(Message msg) {
+                if (msg instanceof ObjectMessage) {
+                    System.out.println("Object Message: " + msg);
+                }
+                else if (msg instanceof MapMessage) {
+                    System.out.println("MapMessage : " + msg);
+                }
+                else {
+                    try {
+                        System.out.println("Unexpected message type " + 
msg.getJMSType());
+                    }
+                    catch (JMSException e) {
+                        System.out.println("Unable to read the type " + e);
+                    }
+                }
+            }
+        });
+        recvr.registerTopic(topicName);
+        Thread.sleep(1000);
+        recvr.unRegisterTopic(topicName);
+    }
+
+}


Reply via email to