This is an automated email from the ASF dual-hosted git repository.
vihangk1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new d034138 HIVE-25796: Allow metastore clients to fetch remaining events
if some of the events are cleaned up (Vihang Karajgaonkar, reviewed by Sourabh
Goyal)
d034138 is described below
commit d034138b976c48930e5973d179c94ea1c2a6130f
Author: Vihang Karajgaonkar <[email protected]>
AuthorDate: Fri Dec 17 10:46:00 2021 -0800
HIVE-25796: Allow metastore clients to fetch remaining events if some of
the events are cleaned up (Vihang Karajgaonkar, reviewed by Sourabh Goyal)
---
.../listener/TestDbNotificationListener.java | 38 ++++++++++++++++++++++
.../hadoop/hive/metastore/HiveMetaStoreClient.java | 17 +++++++++-
.../hadoop/hive/metastore/IMetaStoreClient.java | 21 ++++++++++++
.../metastore/HiveMetaStoreClientPreCatalog.java | 19 +++++++++--
4 files changed, 92 insertions(+), 3 deletions(-)
diff --git
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 7973f0a..100ee24 100644
---
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -19,6 +19,7 @@
package org.apache.hive.hcatalog.listener;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants;
import org.apache.hadoop.hive.metastore.TableType;
@@ -55,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.FunctionType;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
import org.apache.hadoop.hive.metastore.api.Partition;
@@ -1681,6 +1684,41 @@ public class TestDbNotificationListener {
assertEquals(0, rsp2.getEventsSize());
}
+ /**
+ * Test makes sure that if you use the API {@link
HiveMetaStoreClient#getNextNotification(NotificationEventRequest, boolean,
NotificationFilter)}
+ * does not error out if the events are cleanedup.
+ */
+ @Test
+ public void skipCleanedUpEvents() throws Exception {
+ Database db = new Database("cleanup1", "no description", testTempDir,
emptyParameters);
+ msClient.createDatabase(db);
+ msClient.dropDatabase("cleanup1");
+
+ // sleep for expiry time, and then fetch again
+ // sleep twice the TTL interval - things should have been cleaned by then.
+ Thread.sleep(EVENTS_TTL * 2 * 1000);
+
+ db = new Database("cleanup2", "no description", testTempDir,
emptyParameters);
+ msClient.createDatabase(db);
+ msClient.dropDatabase("cleanup2");
+
+ // the firstEventId is before the cleanup happened, so we should just
receive the
+ // events which remaining after cleanup.
+ NotificationEventRequest request = new NotificationEventRequest();
+ request.setLastEvent(firstEventId);
+ request.setMaxEvents(-1);
+ NotificationEventResponse rsp2 = msClient.getNextNotification(request,
true, null);
+ assertEquals(2, rsp2.getEventsSize());
+ // when we pass the allowGapsInEvents as false the API should error out
+ Exception ex = null;
+ try {
+ NotificationEventResponse rsp = msClient.getNextNotification(request,
false, null);
+ } catch (Exception e) {
+ ex = e;
+ }
+ assertNotNull(ex);
+ }
+
@Test
public void cleanupNotificationWithError() throws Exception {
Database db = new Database("cleanup1", "no description", testTempDir,
emptyParameters);
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 7d67315..a077ab1 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -86,6 +86,7 @@ import org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -4211,6 +4212,20 @@ public class HiveMetaStoreClient implements
IMetaStoreClient, AutoCloseable {
NotificationFilter
filter) throws TException {
NotificationEventRequest rqst = new NotificationEventRequest(lastEventId);
rqst.setMaxEvents(maxEvents);
+ return getNextNotificationsInternal(rqst, false, filter);
+ }
+
+ @Override
+ public NotificationEventResponse
getNextNotification(NotificationEventRequest request,
+ boolean allowGapsInEventIds, NotificationFilter filter) throws
TException {
+ return getNextNotificationsInternal(request, allowGapsInEventIds, filter);
+ }
+
+ @Nullable
+ private NotificationEventResponse getNextNotificationsInternal(
+ NotificationEventRequest rqst, boolean allowGapsInEventIds,
+ NotificationFilter filter) throws TException {
+ long lastEventId = rqst.getLastEvent();
NotificationEventResponse rsp = client.get_next_notification(rqst);
LOG.debug("Got back {} events", rsp!= null ? rsp.getEventsSize() : 0);
NotificationEventResponse filtered = new NotificationEventResponse();
@@ -4219,7 +4234,7 @@ public class HiveMetaStoreClient implements
IMetaStoreClient, AutoCloseable {
long prevEventId = lastEventId;
for (NotificationEvent e : rsp.getEvents()) {
LOG.debug("Got event with id : {}", e.getEventId());
- if (e.getEventId() != nextEventId) {
+ if (!allowGapsInEventIds && e.getEventId() != nextEventId) {
if (e.getEventId() == prevEventId) {
LOG.error("NOTIFICATION_LOG table has multiple events with the
same event Id {}. " +
"Something went wrong when inserting notification events.
Bootstrap the system " +
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index e647ee6..53f3c02 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -3648,6 +3648,27 @@ public interface IMetaStoreClient {
NotificationFilter filter)
throws TException;
/**
+ * Get the next set of notifications from the database.
+ * @param request The {@link NotificationEventRequest} request to be sent to
the server
+ * to fetch the next set of events.
+ * @param allowGapsInEventIds If this flag is true, the returned event ids
may contain
+ * gaps in the event ids. This could happen if on
the server
+ * side some of the events since the requested
eventId have
+ * been garbage collected. If the flag is false,
the method
+ * will throw {@link MetaException} if the
returned events
+ * from the server are not in sequence from the
requested
+ * event id.
+ * @param filter User provided filter to remove unwanted events. If null,
all events will be
+ * returned.
+ * @return list of notifications, sorted by eventId. It is guaranteed that
the events are in
+ * the order that the operations were done on the database.
+ * @throws TException
+ */
+ @InterfaceAudience.LimitedPrivate({"HCatalog"})
+ NotificationEventResponse getNextNotification(NotificationEventRequest
request,
+ boolean allowGapsInEventIds, NotificationFilter filter) throws
TException;
+
+ /**
* Get the last used notification event id.
* @return last used id
* @throws TException
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
index db141c4..7bdfa51 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
@@ -84,6 +84,7 @@ import org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -2736,13 +2737,27 @@ public class HiveMetaStoreClientPreCatalog implements
IMetaStoreClient, AutoClos
NotificationFilter
filter) throws TException {
NotificationEventRequest rqst = new NotificationEventRequest(lastEventId);
rqst.setMaxEvents(maxEvents);
- NotificationEventResponse rsp = client.get_next_notification(rqst);
+ return getNextNotificationEventsInternal(rqst, false, filter);
+ }
+
+ @Override
+ public NotificationEventResponse
getNextNotification(NotificationEventRequest request,
+ boolean allowGapsInEventIds, NotificationFilter filter) throws
TException {
+ return getNextNotificationEventsInternal(request, allowGapsInEventIds,
filter);
+ }
+
+ @NotNull
+ private NotificationEventResponse getNextNotificationEventsInternal(
+ NotificationEventRequest request, boolean allowGapsInEventIds,
+ NotificationFilter filter) throws TException {
+ long lastEventId = request.getLastEvent();
+ NotificationEventResponse rsp = client.get_next_notification(request);
LOG.debug("Got back " + rsp.getEventsSize() + " events");
NotificationEventResponse filtered = new NotificationEventResponse();
if (rsp != null && rsp.getEvents() != null) {
long nextEventId = lastEventId + 1;
for (NotificationEvent e : rsp.getEvents()) {
- if (e.getEventId() != nextEventId) {
+ if (!allowGapsInEventIds && e.getEventId() != nextEventId) {
LOG.error("Requested events are found missing in NOTIFICATION_LOG
table. Expected: {}, Actual: {}. "
+ "Probably, cleaner would've cleaned it up. "
+ "Try setting higher value for
hive.metastore.event.db.listener.timetolive. "