This is an automated email from the ASF dual-hosted git repository.

vihangk1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d034138  HIVE-25796: Allow metastore clients to fetch remaining events 
if some of the events are cleaned up (Vihang Karajgaonkar, reviewed by Sourabh 
Goyal)
d034138 is described below

commit d034138b976c48930e5973d179c94ea1c2a6130f
Author: Vihang Karajgaonkar <[email protected]>
AuthorDate: Fri Dec 17 10:46:00 2021 -0800

    HIVE-25796: Allow metastore clients to fetch remaining events if some of 
the events are cleaned up (Vihang Karajgaonkar, reviewed by Sourabh Goyal)
---
 .../listener/TestDbNotificationListener.java       | 38 ++++++++++++++++++++++
 .../hadoop/hive/metastore/HiveMetaStoreClient.java | 17 +++++++++-
 .../hadoop/hive/metastore/IMetaStoreClient.java    | 21 ++++++++++++
 .../metastore/HiveMetaStoreClientPreCatalog.java   | 19 +++++++++--
 4 files changed, 92 insertions(+), 3 deletions(-)

diff --git 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 7973f0a..100ee24 100644
--- 
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ 
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -19,6 +19,7 @@
 package org.apache.hive.hcatalog.listener;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants;
 import org.apache.hadoop.hive.metastore.TableType;
@@ -55,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.FunctionType;
 import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -1681,6 +1684,41 @@ public class TestDbNotificationListener {
     assertEquals(0, rsp2.getEventsSize());
   }
 
+  /**
+   * Test makes sure that if you use the API {@link 
HiveMetaStoreClient#getNextNotification(NotificationEventRequest, boolean, 
NotificationFilter)}
+   * does not error out if the events are cleanedup.
+   */
+  @Test
+  public void skipCleanedUpEvents() throws Exception {
+    Database db = new Database("cleanup1", "no description", testTempDir, 
emptyParameters);
+    msClient.createDatabase(db);
+    msClient.dropDatabase("cleanup1");
+
+    // sleep for expiry time, and then fetch again
+    // sleep twice the TTL interval - things should have been cleaned by then.
+    Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+    db = new Database("cleanup2", "no description", testTempDir, 
emptyParameters);
+    msClient.createDatabase(db);
+    msClient.dropDatabase("cleanup2");
+
+    // the firstEventId is before the cleanup happened, so we should just 
receive the
+    // events which remaining after cleanup.
+    NotificationEventRequest request = new NotificationEventRequest();
+    request.setLastEvent(firstEventId);
+    request.setMaxEvents(-1);
+    NotificationEventResponse rsp2 = msClient.getNextNotification(request, 
true, null);
+    assertEquals(2, rsp2.getEventsSize());
+    // when we pass the allowGapsInEvents as false the API should error out
+    Exception ex = null;
+    try {
+      NotificationEventResponse rsp = msClient.getNextNotification(request, 
false, null);
+    } catch (Exception e) {
+      ex = e;
+    }
+    assertNotNull(ex);
+  }
+
   @Test
   public void cleanupNotificationWithError() throws Exception {
     Database db = new Database("cleanup1", "no description", testTempDir, 
emptyParameters);
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 7d67315..a077ab1 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -86,6 +86,7 @@ import org.apache.thrift.transport.layered.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -4211,6 +4212,20 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient, AutoCloseable {
                                                        NotificationFilter 
filter) throws TException {
     NotificationEventRequest rqst = new NotificationEventRequest(lastEventId);
     rqst.setMaxEvents(maxEvents);
+    return getNextNotificationsInternal(rqst, false, filter);
+  }
+
+  @Override
+  public NotificationEventResponse 
getNextNotification(NotificationEventRequest request,
+      boolean allowGapsInEventIds, NotificationFilter filter) throws 
TException {
+    return getNextNotificationsInternal(request, allowGapsInEventIds, filter);
+  }
+
+  @Nullable
+  private NotificationEventResponse getNextNotificationsInternal(
+      NotificationEventRequest rqst, boolean allowGapsInEventIds,
+      NotificationFilter filter) throws TException {
+    long lastEventId = rqst.getLastEvent();
     NotificationEventResponse rsp = client.get_next_notification(rqst);
     LOG.debug("Got back {} events", rsp!= null ? rsp.getEventsSize() : 0);
     NotificationEventResponse filtered = new NotificationEventResponse();
@@ -4219,7 +4234,7 @@ public class HiveMetaStoreClient implements 
IMetaStoreClient, AutoCloseable {
       long prevEventId = lastEventId;
       for (NotificationEvent e : rsp.getEvents()) {
         LOG.debug("Got event with id : {}", e.getEventId());
-        if (e.getEventId() != nextEventId) {
+        if (!allowGapsInEventIds && e.getEventId() != nextEventId) {
           if (e.getEventId() == prevEventId) {
             LOG.error("NOTIFICATION_LOG table has multiple events with the 
same event Id {}. " +
                     "Something went wrong when inserting notification events.  
Bootstrap the system " +
diff --git 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index e647ee6..53f3c02 100644
--- 
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ 
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -3648,6 +3648,27 @@ public interface IMetaStoreClient {
                                                 NotificationFilter filter) 
throws TException;
 
   /**
+   * Get the next set of notifications from the database.
+   * @param request The {@link NotificationEventRequest} request to be sent to 
the server
+   *                to fetch the next set of events.
+   * @param allowGapsInEventIds If this flag is true, the returned event ids 
may contain
+   *                            gaps in the event ids. This could happen if on 
the server
+   *                            side some of the events since the requested 
eventId have
+   *                            been garbage collected. If the flag is false, 
the method
+   *                            will throw {@link MetaException} if the 
returned events
+   *                            from the server are not in sequence from the 
requested
+   *                            event id.
+   * @param filter User provided filter to remove unwanted events.  If null, 
all events will be
+   *               returned.
+   * @return list of notifications, sorted by eventId.  It is guaranteed that 
the events are in
+   * the order that the operations were done on the database.
+   * @throws TException
+   */
+  @InterfaceAudience.LimitedPrivate({"HCatalog"})
+  NotificationEventResponse getNextNotification(NotificationEventRequest 
request,
+      boolean allowGapsInEventIds, NotificationFilter filter) throws 
TException;
+
+  /**
    * Get the last used notification event id.
    * @return last used id
    * @throws TException
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
index db141c4..7bdfa51 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
@@ -84,6 +84,7 @@ import org.apache.thrift.transport.layered.TFramedTransport;
 import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
+import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -2736,13 +2737,27 @@ public class HiveMetaStoreClientPreCatalog implements 
IMetaStoreClient, AutoClos
                                                        NotificationFilter 
filter) throws TException {
     NotificationEventRequest rqst = new NotificationEventRequest(lastEventId);
     rqst.setMaxEvents(maxEvents);
-    NotificationEventResponse rsp = client.get_next_notification(rqst);
+    return getNextNotificationEventsInternal(rqst, false, filter);
+  }
+
+  @Override
+  public NotificationEventResponse 
getNextNotification(NotificationEventRequest request,
+      boolean allowGapsInEventIds, NotificationFilter filter) throws 
TException {
+    return getNextNotificationEventsInternal(request, allowGapsInEventIds, 
filter);
+  }
+
+  @NotNull
+  private NotificationEventResponse getNextNotificationEventsInternal(
+      NotificationEventRequest request, boolean allowGapsInEventIds,
+      NotificationFilter filter) throws TException {
+    long lastEventId = request.getLastEvent();
+    NotificationEventResponse rsp = client.get_next_notification(request);
     LOG.debug("Got back " + rsp.getEventsSize() + " events");
     NotificationEventResponse filtered = new NotificationEventResponse();
     if (rsp != null && rsp.getEvents() != null) {
       long nextEventId = lastEventId + 1;
       for (NotificationEvent e : rsp.getEvents()) {
-        if (e.getEventId() != nextEventId) {
+        if (!allowGapsInEventIds && e.getEventId() != nextEventId) {
           LOG.error("Requested events are found missing in NOTIFICATION_LOG 
table. Expected: {}, Actual: {}. "
                   + "Probably, cleaner would've cleaned it up. "
                   + "Try setting higher value for 
hive.metastore.event.db.listener.timetolive. "

Reply via email to