Author: avandana
Date: Mon Nov 12 20:01:32 2012
New Revision: 1408431
URL: http://svn.apache.org/viewvc?rev=1408431&view=rev
Log:
HCAT-548 Move topic creation in NotificationListener to a separate method
Modified:
incubator/hcatalog/branches/branch-0.4/CHANGES.txt
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/listener/NotificationListener.java
Modified: incubator/hcatalog/branches/branch-0.4/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/CHANGES.txt?rev=1408431&r1=1408430&r2=1408431&view=diff
==============================================================================
--- incubator/hcatalog/branches/branch-0.4/CHANGES.txt (original)
+++ incubator/hcatalog/branches/branch-0.4/CHANGES.txt Mon Nov 12 20:01:32 2012
@@ -38,6 +38,8 @@ Trunk (unreleased changes)
HCAT-427 Document storage-based authorization (lefty via gates)
IMPROVEMENTS
+ HCAT-548 Move topic creation in NotificationListener to a separate method
(amalakar via avandana)
+
HCAT-538 HCatalogStorer fails for 100GB of data with dynamic partitioning,
number of partition is 300 (amalakar via toffer)
HCAT-492 Document CTAS workaround for Hive with JSON serde (lefty via
khorgath)
Modified:
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1408431&r1=1408430&r2=1408431&view=diff
==============================================================================
---
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/listener/NotificationListener.java
(original)
+++
incubator/hcatalog/branches/branch-0.4/src/java/org/apache/hcatalog/listener/NotificationListener.java
Mon Nov 12 20:01:32 2012
@@ -34,6 +34,7 @@ import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
@@ -195,7 +196,7 @@ public class NotificationListener extend
me.initCause(e);
throw me;
}
- send(newTbl,getTopicPrefix(conf)+ "."+
newTbl.getDbName().toLowerCase(), HCatConstants.HCAT_ADD_TABLE_EVENT);
+ send(newTbl, getTopicPrefix(conf) + "." +
newTbl.getDbName().toLowerCase(), HCatConstants.HCAT_ADD_TABLE_EVENT);
}
}
@@ -236,7 +237,6 @@ public class NotificationListener extend
try{
- Destination topic = null;
if(null == session){
// this will happen, if we never able to
establish a connection.
createConnection();
@@ -247,24 +247,16 @@ public class NotificationListener extend
return;
}
}
- try{
- // Topics are created on demand. If it doesn't
exist on broker it will
- // be created when broker receives this message.
- topic = session.createTopic(topicName);
- } catch (IllegalStateException ise){
- // this will happen if we were able to
establish connection once, but its no longer valid,
- // ise is thrown, catch it and retry.
- LOG.error("Seems like connection is lost.
Retrying", ise);
- createConnection();
- topic = session.createTopic(topicName);
- }
- if (null == topic){
- // Still not successful, return from here.
- LOG.error("Invalid session. Failed to send
message on topic: "+
- topicName + " event: "+event);
- return;
- }
- MessageProducer producer =
session.createProducer(topic);
+ Destination topic = null;
+ topic = getTopic(topicName);
+ if (null == topic){
+ // Still not successful, return from here.
+ LOG.error("Invalid session. Failed to send message on topic: "+
+ topicName + " event: "+ event);
+ return;
+ }
+
+ MessageProducer producer = session.createProducer(topic);
Message msg;
if (msgBody instanceof Map){
MapMessage mapMsg = session.createMapMessage();
@@ -289,6 +281,29 @@ public class NotificationListener extend
}
}
+ /**
+ * Get the topic object for the topicName, it also tries to reconnect
+ * if the connection appears to be broken.
+ * @param topicName
+ * @return
+ * @throws JMSException
+ */
+ protected Topic getTopic(final String topicName) throws JMSException {
+ Topic topic;
+ try{
+ // Topics are created on demand. If it doesn't exist on broker it
will
+ // be created when broker receives this message.
+ topic = session.createTopic(topicName);
+ } catch (IllegalStateException ise){
+ // this will happen if we were able to establish connection once,
but its no longer valid,
+ // ise is thrown, catch it and retry.
+ LOG.error("Seems like connection is lost. Retrying", ise);
+ createConnection();
+ topic = session.createTopic(topicName);
+ }
+ return topic;
+ }
+
protected void createConnection(){
Context jndiCntxt;