This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new e2aeb0a6b IMPALA-14535: Improve wait for HMS events sync with 
hierarchical event processing
e2aeb0a6b is described below

commit e2aeb0a6bda7012f8aba083f74c61cbaaa16d635
Author: Venu Reddy <[email protected]>
AuthorDate: Tue Dec 16 03:26:10 2025 +0530

    IMPALA-14535: Improve wait for HMS events sync with hierarchical event 
processing
    
    With hierarchical event processing, MetastoreEventsProcessor's
    lastSyncedEventId_ represents the last event dispatched to the
    EventExecutorService. Events are queued to appropriate
    DbEventExecutors and/or TableEventExecutor for processing.
    And events are removed from these queues only after they
    are processed by the respective executor threads.
    
    When a wait for HMS events sync request is issued, the
    following sequence is executed:
    1. The current notification event id is fetched from HMS.
    2. MetastoreEventsProcessor is ensured to fetch and dispatch
       all events up to and including this notification event id
       to the EventExecutorService.
    3. The list of databases and tables requiring synchronization
       is extracted from the request.
    4. To determine whether synchronization for a required
       database and or table is complete, relevant DbEventExecutor
       and TableEventExecutor instances are checked directly.
       Synchronization is considered complete only when the
       event-processing progress of the required DbProcessor and
       TableProcessor instances has surpassed the captured
       notification event id.
    
    This approach guarantees that all HMS events that existed at
    the time of sync request have been fully processed for the
    requested databases and tables.
    
    Testing:
        - Added an FE test and ran the existing tests.
    
    Change-Id: I55cea4cb8e04860202e56e1b1bf2596613b4946c
    Reviewed-on: http://gerrit.cloudera.org:8080/23789
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Quanlong Huang <[email protected]>
---
 be/src/catalog/catalog-server.cc                   |   2 +-
 .../impala/catalog/events/DbEventExecutor.java     |  92 +++++++++-
 .../catalog/events/EventExecutorService.java       |  49 ++++-
 .../impala/catalog/events/MetastoreEvents.java     |   3 +-
 .../catalog/events/MetastoreEventsProcessor.java   | 198 +++++++++++++++++++--
 .../impala/catalog/events/TableEventExecutor.java  |  27 +++
 .../catalog/events/EventExecutorServiceTest.java   |  85 +++++++--
 tests/custom_cluster/test_events_custom_configs.py |   6 +-
 8 files changed, 421 insertions(+), 41 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 3f717c8fd..a8535854e 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -270,7 +270,7 @@ DEFINE_string(common_hms_event_types, 
"ADD_PARTITION,ALTER_PARTITION,DROP_PARTIT
 
 DEFINE_bool(enable_hierarchical_event_processing, true,
     "This configuration is used to enable hierarchical event processing. The 
default "
-    "value is false. When enabled, events are fetched, dispatched and 
processed in "
+    "value is true. When enabled, events are fetched, dispatched and processed 
in "
     "different threads.");
 
 DEFINE_int32(num_db_event_executors, 5,
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java 
b/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java
index 90e3a8a9a..c4b21b0af 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java
@@ -23,11 +23,11 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
@@ -132,7 +132,8 @@ public class DbEventExecutor {
     private final Queue<DbBarrierEvent> barrierEvents_ = new 
ConcurrentLinkedQueue<>();
 
     // TableProcessors for the database
-    private final Set<TableProcessor> tableProcessors_ = 
ConcurrentHashMap.newKeySet();
+    private final Map<String, TableProcessor> tableProcessors_ =
+        new ConcurrentHashMap<>();
 
     /**
      * Indicates whether DbProcessor is terminating. Events are dispatched and 
processed
@@ -228,14 +229,15 @@ public class DbEventExecutor {
      * @param event Metastore event
      */
     private void dispatchTableEvent(MetastoreEvent event) {
-      String fqTableName = (event.getDbName() + '.' + 
event.getTableName()).toLowerCase();
+      String tableName = event.getTableName().toLowerCase();
+      String fqTableName = dbEventExecutor_.getFqTableName(event.getDbName(), 
tableName);
       synchronized (processorLock_) {
         if (isTerminating()) return;
         TableEventExecutor tableEventExecutor =
             dbEventExecutor_.getOrAssignTableEventExecutor(fqTableName);
         TableProcessor tableProcessor =
             tableEventExecutor.getOrCreateTableProcessor(fqTableName);
-        tableProcessors_.add(tableProcessor);
+        tableProcessors_.put(tableName, tableProcessor);
         // Prepend all the outstanding db barrier events to TableProcessor so 
that they
         // do not process the table events received before these db barrier 
events are
         // processed
@@ -254,7 +256,7 @@ public class DbEventExecutor {
       synchronized (processorLock_) {
         if (isTerminating()) return;
         barrierEvent = new DbBarrierEvent(event);
-        tableProcessors_.forEach(tableProcessor -> {
+        tableProcessors_.values().forEach(tableProcessor -> {
           if (!tableProcessor.isEmpty()) tableProcessor.enqueue(barrierEvent);
         });
         barrierEvents_.offer(barrierEvent);
@@ -364,9 +366,10 @@ public class DbEventExecutor {
      *              And false to indicate the removal of idle TableProcessor 
if any.
      */
     private void cleanIfNecessary(boolean force) {
-      Iterator<TableProcessor> it = tableProcessors_.iterator();
+      Iterator<Map.Entry<String, TableProcessor>> it =
+          tableProcessors_.entrySet().iterator();
       while (it.hasNext()) {
-        TableProcessor tableProcessor = it.next();
+        TableProcessor tableProcessor = it.next().getValue();
         if (force || tableProcessor.canBeRemoved()) {
           TableEventExecutor tableEventExecutor = 
tableProcessor.getTableEventExecutor();
           
tableEventExecutor.deleteTableProcessor(tableProcessor.getTableName());
@@ -468,6 +471,23 @@ public class DbEventExecutor {
       processDbEvents();
       cleanIfNecessary(false);
     }
+
+    /**
+     * Determines whether all events with event ids less than or equal to the 
given event
+     * id have been processed for this database.
+     * @param eventId Event id up to which events are expected to be processed
+     * @return True if all events up to the given event id are processed. 
False otherwise
+     */
+    private boolean isProcessed(long eventId) {
+      synchronized (processorLock_) {
+        // Return false if the DbProcessor is being terminated. May happen 
when event
+        // processor is being paused or stopped concurrently.
+        if (isTerminating()) return false;
+        if (!inEvents_.isEmpty() &&
+            inEvents_.peek().getEventId() <= eventId) return false;
+        return barrierEvents_.isEmpty() || barrierEvents_.peek().getEventId() 
> eventId;
+      }
+    }
   }
 
   DbEventExecutor(MetastoreEventsProcessor eventProcessor, String name, long 
interval,
@@ -632,7 +652,63 @@ public class DbEventExecutor {
     dbProcessor.enqueue(event);
   }
 
-  @VisibleForTesting
+  /**
+   * Gets the fully qualified table name.
+   * @param dbName Database name
+   * @param tableName Table name
+   * @return Fully qualified table name
+   */
+  String getFqTableName(String dbName, String tableName) {
+    return dbName + '.' + tableName;
+  }
+
+  /**
+   * Determines whether all events with event ids less than or equal to the 
given event id
+   * have been processed for the given database.
+   * @param dbName Database name
+   * @param eventId Event id up to which events are expected to be processed
+   * @return True if all events up to the given event id are processed. False 
otherwise
+   */
+  boolean isProcessed(String dbName, long eventId) {
+    DbProcessor dbProcessor = dbProcessors_.get(dbName);
+    if (dbProcessor == null) return true;
+    return dbProcessor.isProcessed(eventId);
+  }
+
+  /**
+   * Determines whether all events with event ids less than or equal to the 
given event id
+   * have been processed for the given table.
+   * @param dbName Database name
+   * @param tableName Table name
+   * @param eventId Event id up to which events are expected to be processed
+   * @return True if all events up to the given event id are processed. False 
otherwise
+   */
+  boolean isProcessed(String dbName, String tableName, long eventId) {
+    if (!isProcessed(dbName, eventId)) {
+      return false;
+    }
+    TableEventExecutor eventExecutor =
+        getTableEventExecutor(getFqTableName(dbName,  tableName));
+    if (eventExecutor == null) return true;
+    return eventExecutor.isProcessed(getFqTableName(dbName, tableName), 
eventId);
+  }
+
+  /**
+   * Gets all table names mapped to executors for the given database.
+   * @param dbName Database name
+   * @return List of table names
+   */
+  List<String> getTableNames(String dbName) {
+    DbProcessor dbProcessor = dbProcessors_.get(dbName);
+    if (dbProcessor == null) return Collections.emptyList();
+    return new ArrayList<>(dbProcessor.tableProcessors_.keySet());
+  }
+
+  /**
+   * Gets the table event executor for the given table.
+   * @param fqTableName Fully qualified table name
+   * @return TableEventExecutor
+   */
   TableEventExecutor getTableEventExecutor(String fqTableName) {
     Preconditions.checkNotNull(fqTableName);
     return tableToEventExecutor_.get(fqTableName);
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java 
b/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
index cfedc8922..1de27ff96 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
@@ -21,7 +21,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -43,7 +45,6 @@ import 
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreTableEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.PseudoAbortTxnEvent;
-import org.apache.impala.common.Pair;
 import org.apache.impala.common.PrintUtils;
 import org.apache.impala.compat.MetastoreShim;
 import org.slf4j.Logger;
@@ -578,4 +579,50 @@ public class EventExecutorService {
     if (entry != null) greatestSyncedEventTime = entry.getValue();
     return greatestSyncedEventTime;
   }
+
+  /**
+   * Gets all the database names mapped to executors.
+   * @return Set of database names
+   */
+  Set<String> getDbNames() {
+    return new HashSet<>(dbNameToEventExecutor_.keySet());
+  }
+
+  /**
+   * Gets all table names mapped to executors for the given database.
+   * @param dbName Database name
+   * @return List of table names
+   */
+  List<String> getTableNames(String dbName) {
+    DbEventExecutor eventExecutor = getDbEventExecutor(dbName);
+    if (eventExecutor == null) return Collections.emptyList();
+    return eventExecutor.getTableNames(dbName);
+  }
+
+  /**
+   * Determines whether all events with event ids less than or equal to the 
given event id
+   * have been processed for the given database.
+   * @param dbName Database name
+   * @param eventId Event id up to which events are expected to be processed
+   * @return True if all events up to the given event id are processed. False 
otherwise
+   */
+  boolean isProcessed(String dbName, long eventId) {
+    DbEventExecutor eventExecutor = getDbEventExecutor(dbName);
+    if (eventExecutor == null) return true;
+    return eventExecutor.isProcessed(dbName, eventId);
+  }
+
+  /**
+   * Determines whether all events with event ids less than or equal to the 
given event id
+   * have been processed for the given table.
+   * @param dbName Database name
+   * @param tableName Table name
+   * @param eventId Event id up to which events are expected to be processed
+   * @return True if all events up to the given event id are processed. False 
otherwise
+   */
+  boolean isProcessed(String dbName, String tableName, long eventId) {
+    DbEventExecutor eventExecutor = getDbEventExecutor(dbName);
+    if (eventExecutor == null) return true;
+    return eventExecutor.isProcessed(dbName, tableName, eventId);
+  }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index fa93f1d54..4838e9d80 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1049,7 +1049,8 @@ public class MetastoreEvents {
 
     @Override
     public String toString() {
-      return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId_, eventType_);
+      return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId_, eventType_,
+          getTargetName());
     }
 
     protected boolean isOlderThanLastSyncEventId(MetastoreEvent event) {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 46ce22cb9..700d34ab9 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -950,6 +951,13 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     return lastSyncedEventId_.get();
   }
 
+  /**
+   * Gets the last dispatched event id.
+   */
+  public long getLastDispatchedEventId() {
+    return getLastSyncedEventId();
+  }
+
   /**
    * Gets the current value of lastSyncedEventTimeSecs.
    * @return
@@ -1908,6 +1916,15 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
       res.addToError_msgs("Failed to fetch current HMS event id: " + 
e.getMessage());
       return res;
     }
+    try {
+      if (req.should_expand_views) expandViews(req);
+    } catch (CatalogException | TException e) {
+      LOG.warn("Failed to expand views used by the query.", e);
+    }
+    if (isHierarchicalEventProcessingEnabled()) {
+      syncMetadataWithHierarchicalEventProcessing(req, res, latestEventId);
+      return res;
+    }
     try {
       waitForEventId = getMinEventIdToWaitFor(req);
     } catch (Throwable e) {
@@ -1916,7 +1933,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
           latestEventId, e);
       waitForEventId = latestEventId;
     }
-    long lastSyncedEventId = getGreatestSyncedEventId();
+    long lastSyncedEventId = getLastSyncedEventId();
     long startMs = System.currentTimeMillis();
     long sleepIntervalMs = Math.min(timeoutMs, hmsEventSyncSleepIntervalMs_);
     // Avoid too many log entries if the waiting interval is smaller than 
500ms.
@@ -1929,7 +1946,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
             lastSyncedEventId, waitForEventId);
       }
       Uninterruptibles.sleepUninterruptibly(sleepIntervalMs, 
TimeUnit.MILLISECONDS);
-      lastSyncedEventId = getGreatestSyncedEventId();
+      lastSyncedEventId = getLastSyncedEventId();
       if (!canProcessEventInCurrentStatus()) {
         res.setStatus_code(TErrorCode.GENERAL);
         res.addToError_msgs(
@@ -1949,17 +1966,150 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     return res;
   }
 
+  /**
+   * Method to synchronize all events up to the given latest event id for the 
requested
+   * databases and tables.
+   * @param req HMS event sync request
+   * @param res Response
+   * @param latestEventId Event id up to which events are expected to be synced
+   */
+  void syncMetadataWithHierarchicalEventProcessing(TWaitForHmsEventRequest req,
+      TStatus res, long latestEventId) {
+    LOG.info("Synchronize events up to latest event ({})", latestEventId);
+    if (getGreatestSyncedEventId() >= latestEventId) {
+      LOG.info("Greatest synced event id ({}) already reached latest event 
({})",
+          getGreatestSyncedEventId(), latestEventId);
+      res.setStatus_code(TErrorCode.OK);
+      return;
+    }
+    Set<String> dbNames = new HashSet<>();
+    Map<String, List<String>> db2Tables = new HashMap<>();
+    long timeoutMs = req.timeout_s * 1000L;
+    long sleepIntervalMs = Math.min(timeoutMs, hmsEventSyncSleepIntervalMs_);
+    long startMs = System.currentTimeMillis();
+    int numIters = 0;
+    int logIntervals = Math.max(1, 1000 / hmsEventSyncSleepIntervalMs_);
+    // Wait for all events up to the latest event are dispatched to 
DbEventExecutors
+    while (getLastDispatchedEventId() < latestEventId
+        && System.currentTimeMillis() - startMs < timeoutMs) {
+      if (numIters++ % logIntervals == 0) {
+        LOG.info("Waiting for last dispatched event ({}) to reach latest event 
({})",
+            getLastDispatchedEventId(), latestEventId);
+      }
+      Uninterruptibles.sleepUninterruptibly(sleepIntervalMs, 
TimeUnit.MILLISECONDS);
+    }
+    if (req.want_db_list) {
+      // Get the list of dbs that have recently received/processed metastore 
events at
+      // their respective DbProcessors and are not yet purged by idle 
processor cleanup.
+      // We just make sure all pending events that could change the db list are
+      // processed. So we don't need all the db names from the catalog. 
Coordinator just
+      // wants the changes on db list to make its cached db list up-to-date.
+      dbNames = eventExecutorService_.getDbNames();
+    } else if (req.isSetObject_descs()) {
+      // Get all db names and table names for each of db
+      getRequestedDbTableNames(req, dbNames, db2Tables);
+    }
+    if (!dbNames.isEmpty() && System.currentTimeMillis() - startMs < 
timeoutMs) {
+      LOG.info("Databases to ensure synchronized: {}", Joiner.on(", 
").join(dbNames));
+    }
+    if (req.want_table_list) {
+      numIters = 0;
+      // Wait for all events up to the latest event are dispatched to 
TableEventExecutors
+      while (!dbNames.isEmpty() && System.currentTimeMillis() - startMs < 
timeoutMs) {
+        Iterator<String> it = dbNames.iterator();
+        while (it.hasNext()) {
+          String dbName = it.next();
+          if (eventExecutorService_.isProcessed(dbName, latestEventId)) {
+            LOG.info("Synced database: {}", dbName);
+            it.remove();
+            // Gets the list of tables that have recently received/processed 
metastore
+            // events for the db and are not yet purged by idle processor 
cleanup.
+            List<String> tableNames = 
eventExecutorService_.getTableNames(dbName);
+            if (!tableNames.isEmpty()) db2Tables.put(dbName, tableNames);
+          }
+        }
+        if (!dbNames.isEmpty()) {
+          if (numIters++ % logIntervals == 0) {
+            LOG.info("Waiting to finish sync on database processors");
+          }
+          Uninterruptibles.sleepUninterruptibly(sleepIntervalMs, 
TimeUnit.MILLISECONDS);
+        }
+      }
+    }
+    // Wait till necessary dbs and tables are synced
+    if (!db2Tables.isEmpty() && System.currentTimeMillis() - startMs < 
timeoutMs) {
+      LOG.info("Tables to ensure synchronized: {}",
+          Joiner.on(", ").withKeyValueSeparator(" database with tables ")
+              .join(db2Tables));
+    }
+    numIters = 0;
+    boolean isSyncInProgress = !dbNames.isEmpty() || !db2Tables.isEmpty();
+    while (isSyncInProgress && System.currentTimeMillis() - startMs < 
timeoutMs) {
+      // Remove the synced dbs
+      dbNames.removeIf(dbName -> {
+        boolean isProcessed = eventExecutorService_.isProcessed(dbName, 
latestEventId);
+        if (isProcessed) LOG.info("Synced database: {}", dbName);
+        return isProcessed;
+      });
+      // Remove the synced tables
+      Iterator<Map.Entry<String, List<String>>> it = 
db2Tables.entrySet().iterator();
+      while (it.hasNext()) {
+        Map.Entry<String, List<String>> entry = it.next();
+        entry.getValue().removeIf(tableName -> {
+          boolean isProcessed = 
eventExecutorService_.isProcessed(entry.getKey(),
+              tableName, latestEventId);
+          if (isProcessed) LOG.info("Synced table: {}.{}", entry.getKey(), 
tableName);
+          return isProcessed;
+        });
+        if (entry.getValue().isEmpty()) {
+          it.remove();
+        }
+      }
+      isSyncInProgress = !dbNames.isEmpty() || !db2Tables.isEmpty();
+      if (isSyncInProgress) {
+        if (numIters++ % logIntervals == 0) {
+          LOG.info("Waiting to finish sync on database/table processors");
+        }
+        Uninterruptibles.sleepUninterruptibly(sleepIntervalMs, 
TimeUnit.MILLISECONDS);
+      }
+    }
+    // Time out before syncing dbs and tables
+    if (isSyncInProgress || System.currentTimeMillis() - startMs >= timeoutMs) 
{
+      res.setStatus_code(TErrorCode.GENERAL);
+      StringBuilder error = new StringBuilder();
+      error.append(String.format("Timeout waiting for HMS events to be synced. 
" +
+          "Event id to wait for: %d. Last synced event id: %d", latestEventId,
+          getGreatestSyncedEventId()));
+      if (!dbNames.isEmpty()) {
+        error.append(". Failed to sync databases: ").append(Joiner.on(", ")
+            .join(dbNames));
+      }
+      if (!db2Tables.isEmpty()) {
+        error.append(". Failed to sync tables: ")
+            .append(Joiner.on(", ").withKeyValueSeparator(" database with 
tables ")
+                .join(db2Tables));
+      }
+      error.append(".");
+      res.addToError_msgs(error.toString());
+      return;
+    }
+    LOG.info("All required databases and tables are synced up to event ({})",
+        latestEventId);
+    res.setStatus_code(TErrorCode.OK);
+  }
+
   /**
    * Find the min required event id that should be synced to avoid the query 
using
    * stale metadata.
    */
   private long getMinEventIdToWaitFor(TWaitForHmsEventRequest req)
-      throws MetastoreNotificationFetchException, TException, CatalogException 
{
-    if (req.should_expand_views) expandViews(req);
+      throws MetastoreNotificationFetchException {
+    Preconditions.checkState(!isHierarchicalEventProcessingEnabled(),
+        "Hierarchical event processing is enabled");
     // requiredEventId starts from the last synced event id. While checking 
pending
     // events of a target, we just fetch events after requiredEventId, since 
events
     // before it are decided to be waited for.
-    long requiredEventId = getGreatestSyncedEventId();
+    long requiredEventId = getLastSyncedEventId();
     if (req.want_db_list) {
       return getMinRequiredEventIdForDbList(requiredEventId);
     }
@@ -1969,19 +2119,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
       // checked by one RPC.
       Set<String> dbNames = new HashSet<>();
       Map<String, List<String>> db2Tables = new HashMap<>();
-      for (TCatalogObject catalogObject: req.getObject_descs()) {
-        if (catalogObject.isSetDb()) {
-          dbNames.add(catalogObject.getDb().db_name);
-        } else if (catalogObject.isSetTable()) {
-          TTable table = catalogObject.getTable();
-          if (catalog_.getDb(table.db_name) == null) {
-            // We will check existence of missing dbs.
-            dbNames.add(table.db_name);
-          }
-          db2Tables.computeIfAbsent(table.db_name, k -> new ArrayList<>())
-              .add(table.tbl_name);
-        }
-      }
+      getRequestedDbTableNames(req, dbNames, db2Tables);
       // Step 2: Check DB events
       requiredEventId = getMinRequiredEventIdForDb(requiredEventId, dbNames,
           req.want_table_list);
@@ -1996,6 +2134,30 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     return requiredEventId;
   }
 
+  /**
+   * Populates the database names, table names and groups table names by the 
db name for
+   * the given HMS event sync request.
+   * @param req HMS event sync request
+   * @param dbNames database names container
+   * @param db2Tables database name to table names mapping container
+   */
+  private void getRequestedDbTableNames(TWaitForHmsEventRequest req,
+      Set<String> dbNames, Map<String, List<String>> db2Tables) {
+    for (TCatalogObject catalogObject: req.getObject_descs()) {
+      if (catalogObject.isSetDb()) {
+        dbNames.add(catalogObject.getDb().db_name);
+      } else if (catalogObject.isSetTable()) {
+        TTable table = catalogObject.getTable();
+        if (catalog_.getDb(table.db_name) == null) {
+          // We will check existence of missing dbs.
+          dbNames.add(table.db_name);
+        }
+        db2Tables.computeIfAbsent(table.db_name, k -> new ArrayList<>())
+            .add(table.tbl_name);
+      }
+    }
+  }
+
   private static void doForAllObjectNames(TWaitForHmsEventRequest req,
       Function<TTableName, Object> tblFn, @Nullable Function<String, Object> 
dbFn) {
     for (TCatalogObject catalogObject : req.getObject_descs()) {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java 
b/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
index 12eb39f62..d3363a9fe 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
@@ -212,6 +212,20 @@ public class TableEventExecutor {
               BackendConfig.INSTANCE.getMinEventProcessorIdleMs();
     }
 
+    /**
+     * Determines whether all events with event ids less than or equal to the 
given event
+     * id have been processed for this table.
+     * @param eventId Event id up to which events are expected to be processed
+     * @return True if all events up to the given event id are processed. 
False otherwise
+     */
+    private boolean isProcessed(long eventId) {
+      synchronized (processorLock_) {
+        // Return false if the TableProcessor is being removed.
+        if (isTerminating()) return false;
+        return events_.isEmpty() || events_.peek().getEventId() > eventId;
+      }
+    }
+
     /**
      * Determines whether TableProcessor is terminating.
      * @return True if terminating. False otherwise
@@ -489,4 +503,17 @@ public class TableEventExecutor {
       eventProcessor_.handleEventProcessException(e.getException(), 
e.getEvent());
     }
   }
+
+  /**
+   * Determines whether all events with event ids less than or equal to the 
given event id
+   * have been processed for the given table.
+   * @param fqTableName Fully qualified table name
+   * @param eventId Event id up to which events are expected to be processed
+   * @return True if all events up to the given event id are processed. False 
otherwise
+   */
+  boolean isProcessed(String fqTableName, long eventId) {
+    TableProcessor tableProcessor = tableProcessors_.get(fqTableName);
+    if (tableProcessor == null) return true;
+    return tableProcessor.isProcessed(eventId);
+  }
 }
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
index 02b6cd218..7f0e24523 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
@@ -27,13 +27,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -199,21 +199,27 @@ public class EventExecutorServiceTest {
     do {
       isNeedProcess = false;
       for (DbEventExecutor dbExecutor : 
eventExecutorService.getDbEventExecutors()) {
-        if (dbExecutor.getOutstandingEventCount() != 0) {
-          isNeedProcess = true;
-          dbExecutor.process();
-        }
-        for (TableEventExecutor tableExecutor : 
dbExecutor.getTableEventExecutors()) {
-          if (tableExecutor.getOutstandingEventCount() != 0) {
-            isNeedProcess = true;
-            tableExecutor.process();
-          }
-        }
+        isNeedProcess |= process(dbExecutor);
       }
     } while (--maxLoopTimes > 0 && isNeedProcess);
     assertTrue("Events should be processed within the limit", maxLoopTimes > 
0);
   }
 
+  private boolean process(DbEventExecutor dbExecutor) {
+    boolean isNeedProcess = false;
+    if (dbExecutor.getOutstandingEventCount() != 0) {
+      isNeedProcess = true;
+      dbExecutor.process();
+    }
+    for (TableEventExecutor tableExecutor : 
dbExecutor.getTableEventExecutors()) {
+      if (tableExecutor.getOutstandingEventCount() != 0) {
+        isNeedProcess = true;
+        tableExecutor.process();
+      }
+    }
+    return isNeedProcess;
+  }
+
   private EventExecutorService createEventExecutorService(int 
numDbEventExecutor,
       int numTableEventExecutor) {
     EventExecutorService eventExecutorService = new 
EventExecutorService(eventsProcessor_,
@@ -911,6 +917,63 @@ public class EventExecutorServiceTest {
     // processed
     assertEquals(lastEventId, eventExecutorService.getGreatestSyncedEventId());
     assertEquals(0, eventExecutorService.getPendingEventCount(lastEventId));
+    eventExecutorService.shutdown(true);
   }
 
+  /**
+   * Tests whether events prior to the latest event id are processed at the 
executors
+   * @throws Exception
+   */
+  @Test
+  public void testEventsProcessedTillLatestEvent() throws Exception {
+    EventExecutorService eventExecutorService = new 
EventExecutorService(eventsProcessor_,
+        2, 2);
+    
eventExecutorService.setStatus(EventExecutorService.EventExecutorStatus.ACTIVE);
+    String t1 = "t1";
+    String t2 = "t2";
+    String t3 = "t3";
+    String t4 = "t4";
+    createDatabase(DB_NAME1);
+    createDatabase(DB_NAME2);
+    createTable(DB_NAME1, t1);
+    createTransactionalTable(DB_NAME1, t2, false);
+    createTable(DB_NAME2, t3);
+    createTransactionalTable(DB_NAME2, t4, true);
+
+    long latestEventId = eventsProcessor_.getCurrentEventId();
+    List<MetastoreEvent> metastoreEvents = eventsProcessor_.getEventsFactory()
+        .getFilteredEvents(eventsProcessor_.getNextMetastoreEvents(),
+            eventsProcessor_.getMetrics());
+    for (MetastoreEvent event : metastoreEvents) {
+      eventExecutorService.dispatch(event);
+    }
+    DbEventExecutor dbExecutor1 = 
eventExecutorService.getDbEventExecutors().get(0);
+    DbEventExecutor dbExecutor2 = 
eventExecutorService.getDbEventExecutors().get(1);
+    assertEquals(new HashSet<>(Arrays.asList(DB_NAME1, DB_NAME2)),
+        eventExecutorService.getDbNames());
+    int maxLoopTimes = 2;
+    do {
+      assertFalse(eventExecutorService.isProcessed(DB_NAME1, latestEventId));
+      assertFalse(eventExecutorService.isProcessed(DB_NAME1, t1, 
latestEventId));
+      assertFalse(eventExecutorService.isProcessed(DB_NAME1, t2, 
latestEventId));
+    } while (process(dbExecutor1) && --maxLoopTimes > 0);
+    assertTrue(eventExecutorService.isProcessed(DB_NAME1, latestEventId));
+    assertTrue(eventExecutorService.isProcessed(DB_NAME1, t1, latestEventId));
+    assertTrue(eventExecutorService.isProcessed(DB_NAME1, t2, latestEventId));
+    assertEquals(new HashSet<>(Arrays.asList(t1, t2)),
+        new HashSet<>(eventExecutorService.getTableNames(DB_NAME1)));
+
+    maxLoopTimes = 2;
+    do {
+      assertFalse(eventExecutorService.isProcessed(DB_NAME2, latestEventId));
+      assertFalse(eventExecutorService.isProcessed(DB_NAME2, t3, 
latestEventId));
+      assertFalse(eventExecutorService.isProcessed(DB_NAME2, t4, 
latestEventId));
+    } while (process(dbExecutor2) && --maxLoopTimes > 0);
+    assertTrue(eventExecutorService.isProcessed(DB_NAME2, latestEventId));
+    assertTrue(eventExecutorService.isProcessed(DB_NAME2, t3, latestEventId));
+    assertTrue(eventExecutorService.isProcessed(DB_NAME2, t4, latestEventId));
+    assertEquals(new HashSet<>(Arrays.asList(t3, t4)),
+        new HashSet<>(eventExecutorService.getTableNames(DB_NAME2)));
+    eventExecutorService.shutdown(true);
+  }
 }
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 55f7b2ea2..14d7eb1f9 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -27,6 +27,7 @@ from impala_thrift_gen.hive_metastore.ttypes import 
FireEventRequest
 from impala_thrift_gen.hive_metastore.ttypes import FireEventRequestData
 from impala_thrift_gen.hive_metastore.ttypes import InsertEventRequestData
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.environ import ImpalaTestClusterProperties
 from tests.common.impala_connection import ERROR, FINISHED
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.parametrize import UniqueDatabase
@@ -53,7 +54,7 @@ EVENT_SYNC_QUERY_OPTIONS = {
     "sync_hms_events_wait_time_s": 10,
     "sync_hms_events_strict_mode": True
 }
-
+IMPALA_TEST_CLUSTER_PROPERTIES = ImpalaTestClusterProperties.get_instance()
 
 def wait_statestore_heartbeat(num_heartbeat=1):
   """Wait for state sync across impalads."""
@@ -2051,6 +2052,9 @@ class 
TestEventSyncFailures(TestEventProcessingCustomConfigsBase):
         "Timeout waiting for HMS events to be synced. Event id to wait for:")
     assert expected_error in client_log
     assert ". Last synced event id: " in client_log
+    if 
IMPALA_TEST_CLUSTER_PROPERTIES.is_hierarchical_event_processing_enabled():
+      assert ". Failed to sync tables: {} database with tables [part].".format(
+        unique_database) in client_log
     profile = client.get_runtime_profile(handle)
     assert "Errors: " + expected_error in profile, profile
     self.verify_timeline_item("Query Compilation", label, profile)


Reply via email to