This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 62b6081852b4851437f554aa68aa8321099568b0
Author: Venu Reddy <[email protected]>
AuthorDate: Fri Sep 15 20:25:46 2023 +0530

    IMPALA-12399: (addendum) Fixed possible deadloop and also do not set
    the event skip list in notification request to update latestEventId_
    
    Following points are fixed:
    1. Possible deadloop in
    MetastoreEventsProcessor#getNextMetastoreEventsInBatches()
    when OPEN_TXN is the last event on HMS at the moment the
    method is invoked. Need to return when events received
    in response is empty.
    2. latestEventId_ reflects the latest event in HMS. Should
    not set the event skip list in notification request to
    update latestEventId_.
    3. Update lastSyncedEventId_ when event list returned is
    empty due to event skip list in notification event.
    
    Testing:
    - Added tests and executed all existing tests.
    
    Change-Id: Idb4b8c3db23d39226f10b33cca4e6a1ab271b925
    Reviewed-on: http://gerrit.cloudera.org:8080/20487
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/compat/MetastoreShim.java    |  8 +-
 .../org/apache/impala/compat/MetastoreShim.java    | 13 +++-
 .../catalog/events/MetastoreEventsProcessor.java   | 87 ++++++++++++++-------
 .../events/MetastoreEventsProcessorTest.java       | 88 +++++++++++++++++-----
 4 files changed, 150 insertions(+), 46 deletions(-)

diff --git 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 9fab5e384..cd6a97a81 100644
--- 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -457,9 +457,15 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
    * eventIds returned.
    *
    * @see 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient#getNextNotification
+   * @param msClient Metastore client
+   * @param eventRequest Notification event request
+   * @param isSkipUnwantedEventTypes Whether to set skip event types in request
+   * @return NotificationEventResponse
+   * @throws TException
    */
   public static NotificationEventResponse getNextNotification(IMetaStoreClient 
msClient,
-      NotificationEventRequest eventRequest) throws TException {
+      NotificationEventRequest eventRequest, boolean isSkipUnwantedEventTypes)
+      throws TException {
     return getThriftClient(msClient).get_next_notification(eventRequest);
   }
 
diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index d36b46444..504e13a9c 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -583,10 +583,19 @@ public class MetastoreShim extends Hive3MetastoreShimBase 
{
   /**
    * Wrapper around IMetaStoreClient.getThriftClient().get_next_notification() 
to deal
    * with added arguments.
+   *
+   * @param msClient Metastore client
+   * @param eventRequest Notification event request
+   * @param isSkipUnwantedEventTypes Whether to set skip event types in request
+   * @return NotificationEventResponse
+   * @throws TException
    */
   public static NotificationEventResponse getNextNotification(IMetaStoreClient 
msClient,
-      NotificationEventRequest eventRequest) throws TException {
-    
eventRequest.setEventTypeSkipList(MetastoreEventsProcessor.getEventSkipList());
+      NotificationEventRequest eventRequest, boolean isSkipUnwantedEventTypes)
+      throws TException {
+    if (isSkipUnwantedEventTypes) {
+      
eventRequest.setEventTypeSkipList(MetastoreEventsProcessor.getEventSkipList());
+    }
     return msClient.getThriftClient().get_next_notification(eventRequest);
   }
 
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 0e0e54c4f..87650d702 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -318,8 +318,12 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
         NotificationEventRequest eventRequest = new NotificationEventRequest();
         eventRequest.setMaxEvents(batchSize);
         eventRequest.setLastEvent(currentEventId);
-        NotificationEventResponse notificationEventResponse = MetastoreShim
-            .getNextNotification(msc.getHiveClient(), eventRequest);
+        NotificationEventResponse notificationEventResponse =
+            MetastoreShim.getNextNotification(msc.getHiveClient(), 
eventRequest, true);
+        if (notificationEventResponse.getEvents().isEmpty()) {
+          // Possible to receive empty list due to event skip list in request
+          return result;
+        }
         for (NotificationEvent event : notificationEventResponse.getEvents()) {
           // if no filter is provided we add all the events
           if (filter == null || filter.accept(event)) result.add(event);
@@ -689,6 +693,15 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     return lastSyncedEventId_.get();
   }
 
+  /**
+   * Returns the current value of latestEventId_. This method is not 
thread-safe and
+   * only to be used for testing purposes
+   */
+  @VisibleForTesting
+  public long getLatestEventId() {
+    return latestEventId_.get();
+  }
+
   @VisibleForTesting
   void startScheduler() {
     Preconditions.checkState(pollingFrequencyInSec_ > 0);
@@ -813,6 +826,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
    * NotificationEvents are filtered using the NotificationFilter provided if 
it is not
    * null.
    * @param eventId The returned events are all after this given event id.
+   * @param currentEventId Current event id on metastore
    * @param getAllEvents If this is true all the events since eventId are 
returned.
    *                     Note that Hive MetaStore can limit the response to a 
specific
    *                     maximum number of limit based on the value of 
configuration
@@ -826,22 +840,15 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
    * @throws MetastoreNotificationFetchException In case of exceptions from 
HMS.
    */
   public List<NotificationEvent> getNextMetastoreEvents(final long eventId,
-      final boolean getAllEvents, @Nullable final NotificationFilter filter)
+      final long currentEventId, final boolean getAllEvents,
+      @Nullable final NotificationFilter filter)
       throws MetastoreNotificationFetchException {
+    // no new events since we last polled
+    if (currentEventId <= eventId) {
+      return Collections.emptyList();
+    }
     final Timer.Context context = 
metrics_.getTimer(EVENTS_FETCH_DURATION_METRIC).time();
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      // fetch the current notification event id. We assume that the polling 
interval
-      // is small enough that most of these polling operations result in zero 
new
-      // events. In such a case, fetching current notification event id is 
much faster
-      // (and cheaper on HMS side) instead of polling for events directly
-      CurrentNotificationEventId currentNotificationEventId =
-          msClient.getHiveClient().getCurrentNotificationEventId();
-      long currentEventId = currentNotificationEventId.getEventId();
-
-      // no new events since we last polled
-      if (currentEventId <= eventId) {
-        return Collections.emptyList();
-      }
       int batchSize = getAllEvents ? -1 : EVENTS_BATCH_SIZE_PER_RPC;
       // we use the thrift API directly instead of
       // HiveMetastoreClient#getNextNotification because the HMS client can 
throw an
@@ -849,8 +856,8 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
       NotificationEventRequest eventRequest = new NotificationEventRequest();
       eventRequest.setLastEvent(eventId);
       eventRequest.setMaxEvents(batchSize);
-      NotificationEventResponse response = MetastoreShim
-          .getNextNotification(msClient.getHiveClient(), eventRequest);
+      NotificationEventResponse response =
+          MetastoreShim.getNextNotification(msClient.getHiveClient(), 
eventRequest, true);
       LOG.info(String.format("Received %d events. Start event id : %d",
           response.getEvents().size(), eventId));
       if (filter == null) return response.getEvents();
@@ -874,8 +881,21 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
    */
   @VisibleForTesting
   protected List<NotificationEvent> getNextMetastoreEvents()
+      throws MetastoreNotificationFetchException, CatalogException {
+    return getNextMetastoreEvents(getCurrentEventId());
+  }
+
+  /**
+   * Fetch the next batch of NotificationEvents from metastore. The default 
batch size is
+   * <code>EVENTS_BATCH_SIZE_PER_RPC</code>
+   * @param currentEventId Current event id on metastore
+   * @return List of NotificationEvents from metastore since lastSyncedEventId
+   * @throws MetastoreNotificationFetchException
+   */
+  @VisibleForTesting
+  public List<NotificationEvent> getNextMetastoreEvents(long currentEventId)
       throws MetastoreNotificationFetchException {
-    return getNextMetastoreEvents(lastSyncedEventId_.get(), false, null);
+    return getNextMetastoreEvents(lastSyncedEventId_.get(), currentEventId, 
false, null);
   }
 
   /**
@@ -895,9 +915,13 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
             currentStatus, lastSyncedEventId_.get()));
         return;
       }
-
-      List<NotificationEvent> events = getNextMetastoreEvents();
-      processEvents(events);
+      // fetch the current notification event id. We assume that the polling 
interval
+      // is small enough that most of these polling operations result in zero 
new
+      // events. In such a case, fetching current notification event id is 
much faster
+      // (and cheaper on HMS side) instead of polling for events directly
+      long currentEventId = getCurrentEventId();
+      List<NotificationEvent> events = getNextMetastoreEvents(currentEventId);
+      processEvents(currentEventId, events);
     } catch (MetastoreNotificationFetchException ex) {
       // No need to change the EventProcessor state to error since we want the
       // EventProcessor to continue getting new events after HMS is back up.
@@ -925,7 +949,8 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   /**
    * Update the latest event id regularly so we know how far we are lagging 
behind.
    */
-  private void updateLatestEventId() {
+  @VisibleForTesting
+  public void updateLatestEventId() {
     EventProcessorStatus currentStatus = eventProcessorStatus_;
     if (currentStatus != EventProcessorStatus.ACTIVE) {
       return;
@@ -942,8 +967,8 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
       NotificationEventRequest eventRequest = new NotificationEventRequest();
       eventRequest.setLastEvent(currentEventId - 1);
       eventRequest.setMaxEvents(1);
-      NotificationEventResponse response = MetastoreShim
-          .getNextNotification(msClient.getHiveClient(), eventRequest);
+      NotificationEventResponse response = MetastoreShim.getNextNotification(
+          msClient.getHiveClient(), eventRequest, false);
       Iterator<NotificationEvent> eventIter = response.getEventsIterator();
       // Events could be empty if they are just cleaned up.
       if (!eventIter.hasNext()) return;
@@ -1050,14 +1075,24 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   /**
    * Process the given list of notification events. Useful for tests which 
provide a list
    * of events
+   * @param currentEventId Current event id on metastore
+   * @param events List of NotificationEvents
+   * @throws MetastoreNotificationException
    */
   @VisibleForTesting
-  protected void processEvents(List<NotificationEvent> events)
+  protected void processEvents(long currentEventId, List<NotificationEvent> 
events)
       throws MetastoreNotificationException {
     currentEvent_ = null;
     // update the events received metric before returning
     metrics_.getMeter(EVENTS_RECEIVED_METRIC).mark(events.size());
-    if (events.isEmpty()) return;
+    if (events.isEmpty()) {
+      if (lastSyncedEventId_.get() < currentEventId) {
+        // Possible to receive empty list due to event skip list in 
notification event
+        // request. Update the last synced event id with current event id on 
metastore
+        lastSyncedEventId_.set(currentEventId);
+      }
+      return;
+    }
     final Timer.Context context =
         metrics_.getTimer(EVENTS_PROCESS_DURATION_METRIC).time();
     Map<MetastoreEvent, Long> eventProcessingTime = new HashMap<>();
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index b1dd8b097..1ee5da6c5 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -1431,7 +1431,7 @@ public class MetastoreEventsProcessorTest {
 
     @Override
     public List<NotificationEvent> getNextMetastoreEvents()
-        throws MetastoreNotificationFetchException {
+        throws MetastoreNotificationFetchException, CatalogException {
       // Throw exception roughly half of the time
       Random rand = new Random();
       if (rand.nextInt(10) % 2 == 0){
@@ -1513,14 +1513,16 @@ public class MetastoreEventsProcessorTest {
     assertNotNull("Table should have been found after create table statement",
         catalog_.getTable(TEST_DB_NAME, testTblName));
     loadTable(testTblName);
-    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    long currentEventId = eventsProcessor_.getCurrentEventId();
+    List<NotificationEvent> events =
+        eventsProcessor_.getNextMetastoreEvents(currentEventId);
     // the first create table event should not change anything to the 
catalogd's
     // created table
     assertEquals(3, events.size());
     Table existingTable = catalog_.getTable(TEST_DB_NAME, testTblName);
     long id = MetastoreShim.getTableId(existingTable.getMetaStoreTable());
     assertEquals("CREATE_TABLE", events.get(0).getEventType());
-    eventsProcessor_.processEvents(Lists.newArrayList(events.get(0)));
+    eventsProcessor_.processEvents(currentEventId, 
Lists.newArrayList(events.get(0)));
     // after processing the create_table the original table should still 
remain the same
     long testId = MetastoreShim.getTableId(catalog_.getTable(TEST_DB_NAME,
         testTblName).getMetaStoreTable());
@@ -1531,7 +1533,7 @@ public class MetastoreEventsProcessorTest {
     long numFilteredEvents =
         eventsProcessor_.getMetrics()
             
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount();
-    eventsProcessor_.processEvents(Lists.newArrayList(events.get(1)));
+    eventsProcessor_.processEvents(currentEventId, 
Lists.newArrayList(events.get(1)));
     // Verify that the drop_table event is skipped and the metric is 
incremented.
     assertEquals(numFilteredEvents + 1, eventsProcessor_.getMetrics()
         
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
@@ -1540,7 +1542,7 @@ public class MetastoreEventsProcessorTest {
         + "is stale", catalog_.getTable(TEST_DB_NAME, testTblName));
     // the final create table event should also be ignored since its a 
self-event
     assertEquals("CREATE_TABLE", events.get(2).getEventType());
-    eventsProcessor_.processEvents(Lists.newArrayList(events.get(2)));
+    eventsProcessor_.processEvents(currentEventId, 
Lists.newArrayList(events.get(2)));
     assertFalse(
         "Table should have been loaded since the create_table should be " + 
"ignored",
         catalog_.getTable(TEST_DB_NAME,
@@ -1566,7 +1568,7 @@ public class MetastoreEventsProcessorTest {
     List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
     assertEquals(2, events.size());
 
-    eventsProcessor_.processEvents(events);
+    eventsProcessor_.processEvents();
     assertNotNull(catalog_.getDb(TEST_DB_NAME));
     assertNotNull(catalog_.getTable(TEST_DB_NAME, testTblName));
     assertFalse("Table should have been loaded since it was already latest", 
catalog_
@@ -1577,7 +1579,7 @@ public class MetastoreEventsProcessorTest {
     events = eventsProcessor_.getNextMetastoreEvents();
     // should have 1 drop_table event
     assertEquals(1, events.size());
-    eventsProcessor_.processEvents(events);
+    eventsProcessor_.processEvents();
     // dropping a non-existant table should cause event processor to go into 
error state
     assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
     assertNull(catalog_.getTable(TEST_DB_NAME, testTblName));
@@ -1606,12 +1608,14 @@ public class MetastoreEventsProcessorTest {
     assertNull(catalog_.getDb(TEST_DB_NAME));
     createDatabaseFromImpala(TEST_DB_NAME, "second");
     assertNotNull(catalog_.getDb(TEST_DB_NAME));
-    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
+    long currentEventId = eventsProcessor_.getCurrentEventId();
+    List<NotificationEvent> events =
+        eventsProcessor_.getNextMetastoreEvents(currentEventId);
     // should have 3 events for create,drop and create database
     assertEquals(3, events.size());
 
     assertEquals("CREATE_DATABASE", events.get(0).getEventType());
-    eventsProcessor_.processEvents(Lists.newArrayList(events.get(0)));
+    eventsProcessor_.processEvents(currentEventId, 
Lists.newArrayList(events.get(0)));
     // create_database event should have no effect since catalogD has already 
a later
     // version of database with the same name.
     assertNotNull(catalog_.getDb(TEST_DB_NAME));
@@ -1620,7 +1624,7 @@ public class MetastoreEventsProcessorTest {
 
     // now process drop_database event
     assertEquals("DROP_DATABASE", events.get(1).getEventType());
-    eventsProcessor_.processEvents(Lists.newArrayList(events.get(1)));
+    eventsProcessor_.processEvents(currentEventId, 
Lists.newArrayList(events.get(1)));
     // database should not be dropped since catalogD is at the latest state
     assertNotNull(catalog_.getDb(TEST_DB_NAME));
     assertEquals("second",
@@ -1628,7 +1632,7 @@ public class MetastoreEventsProcessorTest {
 
     // the third create_database event should have no effect too
     assertEquals("CREATE_DATABASE", events.get(2).getEventType());
-    eventsProcessor_.processEvents(Lists.newArrayList(events.get(2)));
+    eventsProcessor_.processEvents(currentEventId, 
Lists.newArrayList(events.get(2)));
     assertNotNull(catalog_.getDb(TEST_DB_NAME));
     assertEquals("second",
         catalog_.getDb(TEST_DB_NAME).getMetaStoreDb().getDescription());
@@ -2074,7 +2078,7 @@ public class MetastoreEventsProcessorTest {
   }
 
   private void cleanUpTblsForFlagTests(String dbName)
-      throws TException, MetastoreNotificationFetchException {
+      throws TException, MetastoreNotificationFetchException, CatalogException 
{
     if (catalog_.getDb(dbName) == null) return;
 
     dropDatabaseCascade(dbName);
@@ -3265,15 +3269,65 @@ public class MetastoreEventsProcessorTest {
    */
   @Test
   public void testSkipFetchOpenTransactionEvent() throws Exception {
+    long currentEventId = eventsProcessor_.getCurrentEventId();
     try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
-      // Make an empty transaction
+      // 1. Fetch notification events after open and commit transaction
       long txnId = MetastoreShim.openTransaction(client.getHiveClient());
+      assertEquals(currentEventId + 1, eventsProcessor_.getCurrentEventId());
+      // Verify latest event id
+      eventsProcessor_.updateLatestEventId();
+      assertEquals(currentEventId + 1, eventsProcessor_.getLatestEventId());
+
+      MetastoreShim.commitTransaction(client.getHiveClient(), txnId);
+      assertEquals(currentEventId + 2, eventsProcessor_.getCurrentEventId());
+      // Verify the latest event id again
+      eventsProcessor_.updateLatestEventId();
+      assertEquals(currentEventId + 2, eventsProcessor_.getLatestEventId());
+      // Commit transaction event alone is returned from metastore
+      List<NotificationEvent> events = 
eventsProcessor_.getNextMetastoreEvents();
+      assertEquals(1, events.size());
+      assertEquals(MetastoreEventType.COMMIT_TXN,
+          MetastoreEventType.from(events.get(0).getEventType()));
+      // Verify last synced event id before and after processEvents
+      assertEquals(currentEventId, eventsProcessor_.getLastSyncedEventId());
+      eventsProcessor_.processEvents();
+      assertEquals(currentEventId + 2, 
eventsProcessor_.getLastSyncedEventId());
+
+      // 2. Fetch notification events right after open transaction
+      currentEventId = eventsProcessor_.getCurrentEventId();
+      txnId = MetastoreShim.openTransaction(client.getHiveClient());
+      assertEquals(currentEventId + 1, eventsProcessor_.getCurrentEventId());
+      // Open transaction event is not returned from metastore
+      events = eventsProcessor_.getNextMetastoreEvents();
+      assertEquals(0, events.size());
+      // Verify last synced event id before and after processEvents
+      assertEquals(currentEventId, eventsProcessor_.getLastSyncedEventId());
+      eventsProcessor_.processEvents();
+      assertEquals(currentEventId + 1, 
eventsProcessor_.getLastSyncedEventId());
+
       MetastoreShim.commitTransaction(client.getHiveClient(), txnId);
+      assertEquals(currentEventId + 2, eventsProcessor_.getCurrentEventId());
+    }
+  }
+
+  /**
+   * Test fetching events in batch when last occurred event is open transaction
+   * @throws Exception
+   */
+  @Test
+  public void testFetchEventsInBatchWithOpenTxnAsLastEvent() throws Exception {
+    long currentEventId = eventsProcessor_.getCurrentEventId();
+    try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+      long txnId = MetastoreShim.openTransaction(client.getHiveClient());
+      assertEquals(currentEventId + 1, eventsProcessor_.getCurrentEventId());
+      List<NotificationEvent> events =
+          MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
+              eventsProcessor_.catalog_, currentEventId, null);
+      // Open transaction event is not returned from metastore
+      assertEquals(0, events.size());
+      MetastoreShim.commitTransaction(client.getHiveClient(), txnId);
+      assertEquals(currentEventId + 2, eventsProcessor_.getCurrentEventId());
     }
-    List<NotificationEvent> events = eventsProcessor_.getNextMetastoreEvents();
-    assertEquals(1, events.size());
-    assertEquals(MetastoreEventType.COMMIT_TXN,
-        MetastoreEventType.from(events.get(0).getEventType()));
   }
 
   private void createDatabase(String catName, String dbName,

Reply via email to