This is an automated email from the ASF dual-hosted git repository.

veghlaci05 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a02fe662ba1 HIVE-23680 : TestDbNotificationListener is unstable (Kirti 
Ruge, reviewed by Zsolt Miskolczi, Laszlo Vegh)
a02fe662ba1 is described below

commit a02fe662ba1548dc5a80041d0649e131ca1be789
Author: rkirtir <[email protected]>
AuthorDate: Mon Aug 28 13:22:24 2023 +0530

    HIVE-23680 : TestDbNotificationListener is unstable (Kirti Ruge, reviewed 
by Zsolt Miskolczi, Laszlo Vegh)
---
 itests/hcatalog-unit/pom.xml                       |   1 +
 .../listener/TestDbNotificationCleanup.java        | 189 ++++++++++++++++++
 .../listener/TestDbNotificationListener.java       | 215 +++------------------
 .../TestTransactionalDbNotificationListener.java   | 169 ++++++++++++++++
 4 files changed, 384 insertions(+), 190 deletions(-)

diff --git a/itests/hcatalog-unit/pom.xml b/itests/hcatalog-unit/pom.xml
index 5183a2ba802..4a049697b71 100644
--- a/itests/hcatalog-unit/pom.xml
+++ b/itests/hcatalog-unit/pom.xml
@@ -89,6 +89,7 @@
     <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-standalone-metastore-server</artifactId>
+      <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
     <dependency>
diff --git 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationCleanup.java
 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationCleanup.java
new file mode 100644
index 00000000000..e73a57ea58f
--- /dev/null
+++ 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationCleanup.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.listener;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import java.util.concurrent.TimeUnit;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL;
+
+
+
+public class TestDbNotificationCleanup {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TestDbNotificationCleanup.class
+            .getName());
+    private static final int EVENTS_TTL = 30;
+    private static final int CLEANUP_SLEEP_TIME = 10;
+    private static Map<String, String> emptyParameters = new HashMap<String, 
String>();
+    private static IMetaStoreClient msClient;
+    private static IDriver driver;
+    private static MessageDeserializer md;
+    private static HiveConf conf;
+
+    private long firstEventId;
+    private final String testTempDir = 
Paths.get(System.getProperty("java.io.tmpdir"), "testDbNotif").toString();
+    @SuppressWarnings("rawtypes")
+    @BeforeClass
+    public static void connectToMetastore() throws Exception {
+        conf = new HiveConf();
+
+        
MetastoreConf.setVar(conf,MetastoreConf.ConfVars.TRANSACTIONAL_EVENT_LISTENERS,
+                "org.apache.hive.hcatalog.listener.DbNotificationListener");
+        conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+        conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
+        conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, 
DummyRawStoreFailEvent.class.getName());
+        MetastoreConf.setVar(conf, 
MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY, 
JSONMessageEncoder.class.getName());
+        MetastoreConf.setTimeVar(conf, 
MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, CLEANUP_SLEEP_TIME, 
TimeUnit.SECONDS);
+        MetastoreConf.setTimeVar(conf, 
MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, EVENTS_TTL, TimeUnit.SECONDS);
+        MetastoreConf.setTimeVar(conf, 
EVENT_DB_LISTENER_CLEAN_STARTUP_WAIT_INTERVAL, 20, TimeUnit.SECONDS);
+        conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+                
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+        SessionState.start(new CliSessionState(conf));
+        msClient = new HiveMetaStoreClient(conf);
+        driver = DriverFactory.newDriver(conf);
+
+    }
+
+    @Before
+    public void setup() throws Exception {
+        firstEventId = msClient.getCurrentNotificationEventId().getEventId();
+        DummyRawStoreFailEvent.setEventSucceed(true);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+
+        if (msClient != null) {
+            msClient.close();
+        }
+        if (driver != null) {
+            driver.close();
+        }
+        conf = null;
+    }
+
+
+    @Test
+    public void cleanupNotifs() throws Exception {
+        Database db = new Database("cleanup1", "no description", testTempDir, 
emptyParameters);
+        msClient.createDatabase(db);
+        msClient.dropDatabase("cleanup1");
+
+        LOG.info("Pulling events immediately after 
createDatabase/dropDatabase");
+        NotificationEventResponse rsp = 
msClient.getNextNotification(firstEventId, 0, null);
+        assertEquals(2, rsp.getEventsSize());
+
+        // sleep for expiry time, and then fetch again
+        // sleep twice the TTL interval - things should have been cleaned by 
then.
+        Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+        LOG.info("Pulling events again after cleanup");
+        NotificationEventResponse rsp2 = 
msClient.getNextNotification(firstEventId, 0, null);
+        LOG.info("second trigger done");
+        assertEquals(0, rsp2.getEventsSize());
+    }
+
+    /**
+     * Test makes sure that if you use the API {@link 
HiveMetaStoreClient#getNextNotification(NotificationEventRequest, boolean, 
NotificationFilter)}
+     * does not error out if the events are cleanedup.
+     */
+    @Test
+    public void skipCleanedUpEvents() throws Exception {
+        Database db = new Database("cleanup1", "no description", testTempDir, 
emptyParameters);
+        msClient.createDatabase(db);
+        msClient.dropDatabase("cleanup1");
+
+        // sleep for expiry time, and then fetch again
+        // sleep twice the TTL interval - things should have been cleaned by 
then.
+        Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+        db = new Database("cleanup2", "no description", testTempDir, 
emptyParameters);
+        msClient.createDatabase(db);
+        msClient.dropDatabase("cleanup2");
+
+        // the firstEventId is before the cleanup happened, so we should just 
receive the
+        // events which remaining after cleanup.
+        NotificationEventRequest request = new NotificationEventRequest();
+        request.setLastEvent(firstEventId);
+        request.setMaxEvents(-1);
+        NotificationEventResponse rsp2 = msClient.getNextNotification(request, 
true, null);
+        assertEquals(2, rsp2.getEventsSize());
+        // when we pass the allowGapsInEvents as false the API should error out
+        Exception ex = null;
+        try {
+            NotificationEventResponse rsp = 
msClient.getNextNotification(request, false, null);
+        } catch (Exception e) {
+            ex = e;
+        }
+        assertNotNull(ex);
+    }
+
+    @Test
+    public void cleanupNotificationWithError() throws Exception {
+        Database db = new Database("cleanup1", "no description", testTempDir, 
emptyParameters);
+        msClient.createDatabase(db);
+        msClient.dropDatabase("cleanup1");
+
+        LOG.info("Pulling events immediately after 
createDatabase/dropDatabase");
+        NotificationEventResponse rsp = 
msClient.getNextNotification(firstEventId, 0, null);
+        assertEquals(2, rsp.getEventsSize());
+        //this simulates that cleaning thread will error out while cleaning 
the notifications
+        DummyRawStoreFailEvent.setEventSucceed(false);
+        // sleep for expiry time, and then fetch again
+        // sleep twice the TTL interval - things should have been cleaned by 
then.
+        Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+        LOG.info("Pulling events again after failing to cleanup");
+        NotificationEventResponse rsp2 = 
msClient.getNextNotification(firstEventId, 0, null);
+        LOG.info("second trigger done");
+        assertEquals(2, rsp2.getEventsSize());
+        DummyRawStoreFailEvent.setEventSucceed(true);
+        Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+        LOG.info("Pulling events again after cleanup");
+        rsp2 = msClient.getNextNotification(firstEventId, 0, null);
+        LOG.info("third trigger done");
+        assertEquals(0, rsp2.getEventsSize());
+    }
+}
+
+
diff --git 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 100ee24e1fa..073fdc877f6 100644
--- 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -19,7 +19,6 @@
 package org.apache.hive.hcatalog.listener;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
@@ -29,7 +28,6 @@ import java.util.concurrent.TimeUnit;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -43,7 +41,6 @@ import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -57,7 +54,6 @@ import org.apache.hadoop.hive.metastore.api.FunctionType;
 import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -67,12 +63,11 @@ import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.TxnType;
-import org.apache.hadoop.hive.metastore.client.builder.TableBuilder;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.AlterDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.BatchAcidWriteEvent;
 import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
@@ -82,9 +77,6 @@ import 
org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
 import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.InsertEvent;
-import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
-import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
-import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
 import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
@@ -110,6 +102,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
 import org.apache.hive.hcatalog.data.Pair;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -118,13 +111,12 @@ import org.junit.Test;
 import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.junit.Ignore;
+
 
 /**
  * Tests DbNotificationListener when used as a transactional event listener
  * (hive.metastore.transactional.event.listeners)
  */
[email protected]("TestDbNotificationListener is unstable HIVE-23680")
 public class TestDbNotificationListener {
   private static final Logger LOG = 
LoggerFactory.getLogger(TestDbNotificationListener.class
       .getName());
@@ -211,6 +203,11 @@ public class TestDbNotificationListener {
       pushEventId(EventType.ALTER_TABLE, tableEvent);
     }
 
+    @Override
+    public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws 
MetaException {
+      pushEventId(EventType.ALTER_DATABASE, dbEvent);
+    }
+
     @Override
     public void onAddPartition (AddPartitionEvent partitionEvent) throws 
MetaException {
       pushEventId(EventType.ADD_PARTITION, partitionEvent);
@@ -251,18 +248,6 @@ public class TestDbNotificationListener {
       pushEventId(EventType.INSERT, insertEvent);
     }
 
-    public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException {
-      pushEventId(EventType.OPEN_TXN, openTxnEvent);
-    }
-
-    public void onCommitTxn(CommitTxnEvent commitTxnEvent) throws 
MetaException {
-      pushEventId(EventType.COMMIT_TXN, commitTxnEvent);
-    }
-
-    public void onAbortTxn(AbortTxnEvent abortTxnEvent) throws MetaException {
-      pushEventId(EventType.ABORT_TXN, abortTxnEvent);
-    }
-
     public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent) throws 
MetaException {
       pushEventId(EventType.ALLOC_WRITE_ID, allocWriteIdEvent);
     }
@@ -312,6 +297,18 @@ public class TestDbNotificationListener {
     DummyRawStoreFailEvent.setEventSucceed(true);
   }
 
+  @AfterClass
+  public static void tearDownAfterClass() {
+
+    if (msClient != null) {
+      msClient.close();
+    }
+    if (driver != null) {
+      driver.close();
+    }
+
+  }
+
   @After
   public void tearDown() {
     MockMetaStoreEventListener.clearEvents();
@@ -392,6 +389,8 @@ public class TestDbNotificationListener {
     String newDesc = "test database";
     Database dbAfter = dbBefore.deepCopy();
     dbAfter.setDescription(newDesc);
+    dbAfter.setOwnerName("test2");
+    dbAfter.setOwnerType(PrincipalType.USER);
     msClient.alterDatabase(dbName, dbAfter);
     dbAfter = msClient.getDatabase(dbName);
 
@@ -1071,88 +1070,6 @@ public class TestDbNotificationListener {
     testEventCounts(defaultDbName, firstEventId, null, null, 3);
   }
 
-  @Test
-  public void openTxn() throws Exception {
-    msClient.openTxn("me", TxnType.READ_ONLY);
-    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
-    assertEquals(0, rsp.getEventsSize());
-
-    msClient.openTxn("me", TxnType.DEFAULT);
-    rsp = msClient.getNextNotification(firstEventId, 0, null);
-    assertEquals(1, rsp.getEventsSize());
-
-    NotificationEvent event = rsp.getEvents().get(0);
-    assertEquals(firstEventId + 1, event.getEventId());
-    assertTrue(event.getEventTime() >= startTime);
-    assertEquals(EventType.OPEN_TXN.toString(), event.getEventType());
-  }
-
-  @Test
-  public void abortTxn() throws Exception {
-    long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
-    long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
-
-    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
-    assertEquals(1, rsp.getEventsSize());
-
-    msClient.abortTxns(Collections.singletonList(txnId1));
-    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-    assertEquals(0, rsp.getEventsSize());
-
-    msClient.abortTxns(Collections.singletonList(txnId2));
-    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-    assertEquals(1, rsp.getEventsSize());
-
-    NotificationEvent event = rsp.getEvents().get(0);
-    assertEquals(firstEventId + 2, event.getEventId());
-    assertTrue(event.getEventTime() >= startTime);
-    assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
-  }
-
-  @Test
-  public void rollbackTxn() throws Exception {
-    long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
-    long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
-
-    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
-    assertEquals(1, rsp.getEventsSize());
-
-    msClient.rollbackTxn(txnId1);
-    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-    assertEquals(0, rsp.getEventsSize());
-
-    msClient.rollbackTxn(txnId2);
-    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-    assertEquals(1, rsp.getEventsSize());
-
-    NotificationEvent event = rsp.getEvents().get(0);
-    assertEquals(firstEventId + 2, event.getEventId());
-    assertTrue(event.getEventTime() >= startTime);
-    assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
-  }
-
-  @Test
-  public void commitTxn() throws Exception {
-    long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
-    long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
-
-    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
-    assertEquals(1, rsp.getEventsSize());
-
-    msClient.commitTxn(txnId1);
-    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-    assertEquals(0, rsp.getEventsSize());
-
-    msClient.commitTxn(txnId2);
-    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
-    assertEquals(1, rsp.getEventsSize());
-
-    NotificationEvent event = rsp.getEvents().get(0);
-    assertEquals(firstEventId + 2, event.getEventId());
-    assertTrue(event.getEventTime() >= startTime);
-    assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType());
-  }
-
   @Test
   public void insertTable() throws Exception {
     String defaultDbName = "default";
@@ -1420,7 +1337,6 @@ public class TestDbNotificationListener {
   }
 
   @Test
-  @Ignore("HIVE-23401")
   public void sqlInsertTable() throws Exception {
     String defaultDbName = "default";
     String tblName = "sqlins";
@@ -1532,7 +1448,7 @@ public class TestDbNotificationListener {
     // Event 5, 6, 7
     driver.run("insert into table " + tblName + " partition (ds = 'today') 
values (2)");
     // Event 8, 9, 10
-    driver.run("insert into table " + tblName + " partition (ds) values (3, 
'today')");
+    driver.run("insert into table " + tblName + " partition (ds = 'today') 
values (3)");
     // Event 9, 10
     driver.run("alter table " + tblName + " add partition (ds = 'yesterday')");
 
@@ -1545,9 +1461,9 @@ public class TestDbNotificationListener {
     // Event 10, 11, 12
     driver.run("insert into table " + tblName + " partition (ds = 'yesterday') 
values (2)");
     // Event 12, 13, 14
-    driver.run("insert into table " + tblName + " partition (ds) values (3, 
'yesterday')");
+    driver.run("insert into table " + tblName + " partition (ds = 'yesterday') 
values (3)");
     // Event 15, 16, 17
-    driver.run("insert into table " + tblName + " partition (ds) values (3, 
'tomorrow')");
+    driver.run("insert into table " + tblName + " partition (ds = 'tomorrow') 
values (2)");
     // Event 18
     driver.run("alter table " + tblName + " drop partition (ds = 'tomorrow')");
     // Event 19, 20, 21
@@ -1664,86 +1580,5 @@ public class TestDbNotificationListener {
     assertTrue(files.hasNext());
   }
 
-  @Test
-  public void cleanupNotifs() throws Exception {
-    Database db = new Database("cleanup1", "no description", testTempDir, 
emptyParameters);
-    msClient.createDatabase(db);
-    msClient.dropDatabase("cleanup1");
 
-    LOG.info("Pulling events immediately after createDatabase/dropDatabase");
-    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
-    assertEquals(2, rsp.getEventsSize());
-
-    // sleep for expiry time, and then fetch again
-    // sleep twice the TTL interval - things should have been cleaned by then.
-    Thread.sleep(EVENTS_TTL * 2 * 1000);
-
-    LOG.info("Pulling events again after cleanup");
-    NotificationEventResponse rsp2 = 
msClient.getNextNotification(firstEventId, 0, null);
-    LOG.info("second trigger done");
-    assertEquals(0, rsp2.getEventsSize());
-  }
-
-  /**
-   * Test makes sure that if you use the API {@link 
HiveMetaStoreClient#getNextNotification(NotificationEventRequest, boolean, 
NotificationFilter)}
-   * does not error out if the events are cleanedup.
-   */
-  @Test
-  public void skipCleanedUpEvents() throws Exception {
-    Database db = new Database("cleanup1", "no description", testTempDir, 
emptyParameters);
-    msClient.createDatabase(db);
-    msClient.dropDatabase("cleanup1");
-
-    // sleep for expiry time, and then fetch again
-    // sleep twice the TTL interval - things should have been cleaned by then.
-    Thread.sleep(EVENTS_TTL * 2 * 1000);
-
-    db = new Database("cleanup2", "no description", testTempDir, 
emptyParameters);
-    msClient.createDatabase(db);
-    msClient.dropDatabase("cleanup2");
-
-    // the firstEventId is before the cleanup happened, so we should just 
receive the
-    // events which remaining after cleanup.
-    NotificationEventRequest request = new NotificationEventRequest();
-    request.setLastEvent(firstEventId);
-    request.setMaxEvents(-1);
-    NotificationEventResponse rsp2 = msClient.getNextNotification(request, 
true, null);
-    assertEquals(2, rsp2.getEventsSize());
-    // when we pass the allowGapsInEvents as false the API should error out
-    Exception ex = null;
-    try {
-      NotificationEventResponse rsp = msClient.getNextNotification(request, 
false, null);
-    } catch (Exception e) {
-      ex = e;
-    }
-    assertNotNull(ex);
-  }
-
-  @Test
-  public void cleanupNotificationWithError() throws Exception {
-    Database db = new Database("cleanup1", "no description", testTempDir, 
emptyParameters);
-    msClient.createDatabase(db);
-    msClient.dropDatabase("cleanup1");
-
-    LOG.info("Pulling events immediately after createDatabase/dropDatabase");
-    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 
0, null);
-    assertEquals(2, rsp.getEventsSize());
-    //this simulates that cleaning thread will error out while cleaning the 
notifications
-    DummyRawStoreFailEvent.setEventSucceed(false);
-    // sleep for expiry time, and then fetch again
-    // sleep twice the TTL interval - things should have been cleaned by then.
-    Thread.sleep(EVENTS_TTL * 2 * 1000);
-
-    LOG.info("Pulling events again after failing to cleanup");
-    NotificationEventResponse rsp2 = 
msClient.getNextNotification(firstEventId, 0, null);
-    LOG.info("second trigger done");
-    assertEquals(2, rsp2.getEventsSize());
-    DummyRawStoreFailEvent.setEventSucceed(true);
-    Thread.sleep(EVENTS_TTL * 2 * 1000);
-
-    LOG.info("Pulling events again after cleanup");
-    rsp2 = msClient.getNextNotification(firstEventId, 0, null);
-    LOG.info("third trigger done");
-    assertEquals(0, rsp2.getEventsSize());
-  }
 }
diff --git 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
new file mode 100644
index 00000000000..3b9853684a4
--- /dev/null
+++ 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.listener;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import java.util.Collections;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+
+public class TestTransactionalDbNotificationListener {
+    private static IMetaStoreClient msClient;
+    private static IDriver driver;
+
+    private int startTime;
+    private long firstEventId;
+
+
+    @SuppressWarnings("rawtypes")
+    @BeforeClass
+    public static void connectToMetastore() throws Exception {
+        HiveConf conf = new HiveConf();
+        conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
+                "org.apache.hive.hcatalog.listener.DbNotificationListener");
+        conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+        conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
+        conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, 
DummyRawStoreFailEvent.class.getName());
+        MetastoreConf.setVar(conf, 
MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY, 
JSONMessageEncoder.class.getName());
+        conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+                
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+        SessionState.start(new CliSessionState(conf));
+        TestTxnDbUtil.setConfValues(conf);
+        TestTxnDbUtil.prepDb(conf);
+        msClient = new HiveMetaStoreClient(conf);
+        driver = DriverFactory.newDriver(conf);
+
+    }
+
+    @Before
+    public void setup() throws Exception {
+        long now = System.currentTimeMillis() / 1000;
+        startTime = 0;
+        if (now > Integer.MAX_VALUE) {
+            fail("Bummer, time has fallen over the edge");
+        } else {
+            startTime = (int) now;
+        }
+        firstEventId = msClient.getCurrentNotificationEventId().getEventId();
+        DummyRawStoreFailEvent.setEventSucceed(true);
+    }
+
+    @Test
+    public void openTxn() throws Exception {
+        msClient.openTxn("me", TxnType.READ_ONLY);
+        NotificationEventResponse rsp = 
msClient.getNextNotification(firstEventId, 0, null);
+        assertEquals(0, rsp.getEventsSize());
+
+        msClient.openTxn("me", TxnType.DEFAULT);
+        rsp = msClient.getNextNotification(firstEventId, 0, null);
+        assertEquals(1, rsp.getEventsSize());
+
+        NotificationEvent event = rsp.getEvents().get(0);
+        assertEquals(firstEventId + 1, event.getEventId());
+        assertTrue(event.getEventTime() >= startTime);
+        assertEquals(EventType.OPEN_TXN.toString(), event.getEventType());
+    }
+
+    @Test
+    public void abortTxn() throws Exception {
+
+        long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+        long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+        NotificationEventResponse rsp = 
msClient.getNextNotification(firstEventId, 0, null);
+        assertEquals(1, rsp.getEventsSize());
+
+        msClient.abortTxns(Collections.singletonList(txnId1));
+        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+        assertEquals(0, rsp.getEventsSize());
+
+        msClient.abortTxns(Collections.singletonList(txnId2));
+        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+        assertEquals(1, rsp.getEventsSize());
+
+        NotificationEvent event = rsp.getEvents().get(0);
+        assertEquals(firstEventId + 2, event.getEventId());
+        assertTrue(event.getEventTime() >= startTime);
+        assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
+    }
+
+    @Test
+    public void rollbackTxn() throws Exception {
+        long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+        long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+        NotificationEventResponse rsp = 
msClient.getNextNotification(firstEventId, 0, null);
+        assertEquals(1, rsp.getEventsSize());
+
+        msClient.rollbackTxn(txnId1);
+        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+        assertEquals(0, rsp.getEventsSize());
+
+        msClient.rollbackTxn(txnId2);
+        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+        assertEquals(1, rsp.getEventsSize());
+
+        NotificationEvent event = rsp.getEvents().get(0);
+        assertEquals(firstEventId + 2, event.getEventId());
+        assertTrue(event.getEventTime() >= startTime);
+        assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
+    }
+
+    @Test
+    public void commitTxn() throws Exception {
+        long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+        long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+        NotificationEventResponse rsp = 
msClient.getNextNotification(firstEventId, 0, null);
+        assertEquals(1, rsp.getEventsSize());
+
+        msClient.commitTxn(txnId1);
+        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+        assertEquals(0, rsp.getEventsSize());
+
+        msClient.commitTxn(txnId2);
+        rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+        assertEquals(1, rsp.getEventsSize());
+
+        NotificationEvent event = rsp.getEvents().get(0);
+        assertEquals(firstEventId + 2, event.getEventId());
+        assertTrue(event.getEventTime() >= startTime);
+        assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType());
+    }
+
+}
\ No newline at end of file

Reply via email to