Repository: hive Updated Branches: refs/heads/master 148807a98 -> 8412748a7
HIVE-18330 : Fix TestMsgBusConnection - doesn't test tests the original intention (Zoltan Haindrich via Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8412748a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8412748a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8412748a Branch: refs/heads/master Commit: 8412748a7ee6264b5365b211affe44af35cae9b0 Parents: 148807a Author: Zoltan Haindrich <[email protected]> Authored: Fri Dec 22 05:50:00 2017 -0800 Committer: Ashutosh Chauhan <[email protected]> Committed: Mon Jan 8 15:09:03 2018 -0800 ---------------------------------------------------------------------- .../hcatalog/listener/NotificationListener.java | 3 + .../hcatalog/listener/TestMsgBusConnection.java | 91 +++++++++----------- 2 files changed, 46 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8412748a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java index 66349c5..fbdebc0 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/NotificationListener.java @@ -422,6 +422,9 @@ public class NotificationListener extends MetaStoreEventListener { * @throws JMSException */ protected Session createSession() throws JMSException { + if (conn == null) { + return null; + } // We want message to be sent when session commits, thus we run in // transacted mode. return conn.createSession(true, Session.SESSION_TRANSACTED); http://git-wip-us.apache.org/repos/asf/hive/blob/8412748a/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestMsgBusConnection.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestMsgBusConnection.java b/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestMsgBusConnection.java index 7c5d6d9..4319f2a 100644 --- a/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestMsgBusConnection.java +++ b/hcatalog/server-extensions/src/test/java/org/apache/hive/hcatalog/listener/TestMsgBusConnection.java @@ -19,68 +19,69 @@ package org.apache.hive.hcatalog.listener; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.TextMessage; import javax.jms.Session; - -import junit.framework.TestCase; +import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.DriverFactory; import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.messaging.HCatEventMessage; import org.apache.hive.hcatalog.messaging.jms.MessagingUtils; +import org.junit.Before; +import org.junit.Test; -public class TestMsgBusConnection extends TestCase { +public class TestMsgBusConnection { private IDriver driver; private BrokerService broker; private MessageConsumer consumer; private static final int TIMEOUT = 2000; - @Override - protected void setUp() throws Exception { - super.setUp(); + @Before + public void before() throws Exception { + broker = new BrokerService(); // configure the broker broker.addConnector("tcp://localhost:61616?broker.persistent=false"); broker.start(); - System.setProperty("java.naming.factory.initial", - "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); System.setProperty("java.naming.provider.url", "tcp://localhost:61616"); connectClient(); HiveConf hiveConf = new HiveConf(this.getClass()); - hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, - NotificationListener.class.getName()); + hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname, NotificationListener.class.getName()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, ""); hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, - "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat"); SessionState.start(new CliSessionState(hiveConf)); driver = DriverFactory.newDriver(hiveConf); } private void connectClient() throws JMSException { - ConnectionFactory connFac = new ActiveMQConnectionFactory( - "tcp://localhost:61616"); + ConnectionFactory connFac = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = connFac.createConnection(); conn.start(); Session session = conn.createSession(true, Session.SESSION_TRANSACTED); @@ -88,39 +89,33 @@ public class TestMsgBusConnection extends TestCase { consumer = session.createConsumer(hcatTopic); } + @Test public void testConnection() throws Exception { + driver.run("create database testconndb"); + Message msg = consumer.receive(TIMEOUT); + assertTrue("Expected TextMessage", msg instanceof TextMessage); + assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + HCatEventMessage messageObject = MessagingUtils.getMessage(msg); + assertEquals("testconndb", messageObject.getDB()); + broker.stop(); + runQuery("drop database testconndb cascade"); + broker.start(true); + connectClient(); + runQuery("create database testconndb"); + msg = consumer.receive(TIMEOUT); + assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", messageObject.getDB()); + driver.run("drop database testconndb cascade"); + msg = consumer.receive(TIMEOUT); + assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT)); + assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); + assertEquals("testconndb", messageObject.getDB()); + } - try { - driver.run("create database testconndb"); - Message msg = consumer.receive(TIMEOUT); - assertTrue("Expected TextMessage", msg instanceof TextMessage); - assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, - msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - HCatEventMessage messageObject = MessagingUtils.getMessage(msg); - assertEquals("testconndb", messageObject.getDB()); - broker.stop(); - driver.run("drop database testconndb cascade"); - broker.start(true); - connectClient(); - driver.run("create database testconndb"); - msg = consumer.receive(TIMEOUT); - assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, - msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - assertEquals("testconndb", messageObject.getDB()); - driver.run("drop database testconndb cascade"); - msg = consumer.receive(TIMEOUT); - assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, - msg.getStringProperty(HCatConstants.HCAT_EVENT)); - assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString()); - assertEquals("testconndb", messageObject.getDB()); - } catch (NoSuchObjectException nsoe) { - nsoe.printStackTrace(System.err); - assert false; - } catch (AlreadyExistsException aee) { - aee.printStackTrace(System.err); - assert false; - } + private void runQuery(String query) throws CommandNeedRetryException { + CommandProcessorResponse cpr = driver.run(query); + assertFalse(cpr.getMessage(), cpr.failed()); } }
