This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new e2aeb0a6b IMPALA-14535: Improve wait for HMS events sync with
hierarchical event processing
e2aeb0a6b is described below
commit e2aeb0a6bda7012f8aba083f74c61cbaaa16d635
Author: Venu Reddy <[email protected]>
AuthorDate: Tue Dec 16 03:26:10 2025 +0530
IMPALA-14535: Improve wait for HMS events sync with hierarchical event
processing
With hierarchical event processing, MetastoreEventsProcessor's
lastSyncedEventId_ represents the last event dispatched to the
EventExecutorService. Events are queued to appropriate
DbEventExecutors and/or TableEventExecutor for processing.
And events are removed from these queues only after they
are processed by the respective executor threads.
When a wait for HMS events sync request is issued, the
following sequence is executed:
1. The current notification event id is fetched from HMS.
2. MetastoreEventsProcessor is ensured to fetch and dispatch
all events up to and including this notification event id
to the EventExecutorService.
3. The list of databases and tables requiring synchronization
is extracted from the request.
4. To determine whether synchronization for a required
database and or table is complete, relevant DbEventExecutor
and TableEventExecutor instances are checked directly.
Synchronization is considered complete only when the
event-processing progress of the required DbProcessor and
TableProcessor instances has surpassed the captured
notification event id.
This approach guarantees that all HMS events that existed at
the time of sync request have been fully processed for the
requested databases and tables.
Testing:
- Added an FE test and ran the existing tests.
Change-Id: I55cea4cb8e04860202e56e1b1bf2596613b4946c
Reviewed-on: http://gerrit.cloudera.org:8080/23789
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Quanlong Huang <[email protected]>
---
be/src/catalog/catalog-server.cc | 2 +-
.../impala/catalog/events/DbEventExecutor.java | 92 +++++++++-
.../catalog/events/EventExecutorService.java | 49 ++++-
.../impala/catalog/events/MetastoreEvents.java | 3 +-
.../catalog/events/MetastoreEventsProcessor.java | 198 +++++++++++++++++++--
.../impala/catalog/events/TableEventExecutor.java | 27 +++
.../catalog/events/EventExecutorServiceTest.java | 85 +++++++--
tests/custom_cluster/test_events_custom_configs.py | 6 +-
8 files changed, 421 insertions(+), 41 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 3f717c8fd..a8535854e 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -270,7 +270,7 @@ DEFINE_string(common_hms_event_types,
"ADD_PARTITION,ALTER_PARTITION,DROP_PARTIT
DEFINE_bool(enable_hierarchical_event_processing, true,
"This configuration is used to enable hierarchical event processing. The
default "
- "value is false. When enabled, events are fetched, dispatched and
processed in "
+ "value is true. When enabled, events are fetched, dispatched and processed
in "
"different threads.");
DEFINE_int32(num_db_event_executors, 5,
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java
b/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java
index 90e3a8a9a..c4b21b0af 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/DbEventExecutor.java
@@ -23,11 +23,11 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
@@ -132,7 +132,8 @@ public class DbEventExecutor {
private final Queue<DbBarrierEvent> barrierEvents_ = new
ConcurrentLinkedQueue<>();
// TableProcessors for the database
- private final Set<TableProcessor> tableProcessors_ =
ConcurrentHashMap.newKeySet();
+ private final Map<String, TableProcessor> tableProcessors_ =
+ new ConcurrentHashMap<>();
/**
* Indicates whether DbProcessor is terminating. Events are dispatched and
processed
@@ -228,14 +229,15 @@ public class DbEventExecutor {
* @param event Metastore event
*/
private void dispatchTableEvent(MetastoreEvent event) {
- String fqTableName = (event.getDbName() + '.' +
event.getTableName()).toLowerCase();
+ String tableName = event.getTableName().toLowerCase();
+ String fqTableName = dbEventExecutor_.getFqTableName(event.getDbName(),
tableName);
synchronized (processorLock_) {
if (isTerminating()) return;
TableEventExecutor tableEventExecutor =
dbEventExecutor_.getOrAssignTableEventExecutor(fqTableName);
TableProcessor tableProcessor =
tableEventExecutor.getOrCreateTableProcessor(fqTableName);
- tableProcessors_.add(tableProcessor);
+ tableProcessors_.put(tableName, tableProcessor);
// Prepend all the outstanding db barrier events to TableProcessor so
that they
// do not process the table events received before these db barrier
events are
// processed
@@ -254,7 +256,7 @@ public class DbEventExecutor {
synchronized (processorLock_) {
if (isTerminating()) return;
barrierEvent = new DbBarrierEvent(event);
- tableProcessors_.forEach(tableProcessor -> {
+ tableProcessors_.values().forEach(tableProcessor -> {
if (!tableProcessor.isEmpty()) tableProcessor.enqueue(barrierEvent);
});
barrierEvents_.offer(barrierEvent);
@@ -364,9 +366,10 @@ public class DbEventExecutor {
* And false to indicate the removal of idle TableProcessor
if any.
*/
private void cleanIfNecessary(boolean force) {
- Iterator<TableProcessor> it = tableProcessors_.iterator();
+ Iterator<Map.Entry<String, TableProcessor>> it =
+ tableProcessors_.entrySet().iterator();
while (it.hasNext()) {
- TableProcessor tableProcessor = it.next();
+ TableProcessor tableProcessor = it.next().getValue();
if (force || tableProcessor.canBeRemoved()) {
TableEventExecutor tableEventExecutor =
tableProcessor.getTableEventExecutor();
tableEventExecutor.deleteTableProcessor(tableProcessor.getTableName());
@@ -468,6 +471,23 @@ public class DbEventExecutor {
processDbEvents();
cleanIfNecessary(false);
}
+
+ /**
+ * Determines whether all events with event ids less than or equal to the
given event
+ * id have been processed for this database.
+ * @param eventId Event id up to which events are expected to be processed
+ * @return True if all events up to the given event id are processed.
False otherwise
+ */
+ private boolean isProcessed(long eventId) {
+ synchronized (processorLock_) {
+ // Return false if the DbProcessor is being terminated. May happen
when event
+ // processor is being paused or stopped concurrently.
+ if (isTerminating()) return false;
+ if (!inEvents_.isEmpty() &&
+ inEvents_.peek().getEventId() <= eventId) return false;
+ return barrierEvents_.isEmpty() || barrierEvents_.peek().getEventId()
> eventId;
+ }
+ }
}
DbEventExecutor(MetastoreEventsProcessor eventProcessor, String name, long
interval,
@@ -632,7 +652,63 @@ public class DbEventExecutor {
dbProcessor.enqueue(event);
}
- @VisibleForTesting
+ /**
+ * Gets the fully qualified table name.
+ * @param dbName Database name
+ * @param tableName Table name
+ * @return Fully qualified table name
+ */
+ String getFqTableName(String dbName, String tableName) {
+ return dbName + '.' + tableName;
+ }
+
+ /**
+ * Determines whether all events with event ids less than or equal to the
given event id
+ * have been processed for the given database.
+ * @param dbName Database name
+ * @param eventId Event id up to which events are expected to be processed
+ * @return True if all events up to the given event id are processed. False
otherwise
+ */
+ boolean isProcessed(String dbName, long eventId) {
+ DbProcessor dbProcessor = dbProcessors_.get(dbName);
+ if (dbProcessor == null) return true;
+ return dbProcessor.isProcessed(eventId);
+ }
+
+ /**
+ * Determines whether all events with event ids less than or equal to the
given event id
+ * have been processed for the given table.
+ * @param dbName Database name
+ * @param tableName Table name
+ * @param eventId Event id up to which events are expected to be processed
+ * @return True if all events up to the given event id are processed. False
otherwise
+ */
+ boolean isProcessed(String dbName, String tableName, long eventId) {
+ if (!isProcessed(dbName, eventId)) {
+ return false;
+ }
+ TableEventExecutor eventExecutor =
+ getTableEventExecutor(getFqTableName(dbName, tableName));
+ if (eventExecutor == null) return true;
+ return eventExecutor.isProcessed(getFqTableName(dbName, tableName),
eventId);
+ }
+
+ /**
+ * Gets all table names mapped to executors for the given database.
+ * @param dbName Database name
+ * @return List of table names
+ */
+ List<String> getTableNames(String dbName) {
+ DbProcessor dbProcessor = dbProcessors_.get(dbName);
+ if (dbProcessor == null) return Collections.emptyList();
+ return new ArrayList<>(dbProcessor.tableProcessors_.keySet());
+ }
+
+ /**
+ * Gets the table event executor for the given table.
+ * @param fqTableName Fully qualified table name
+ * @return TableEventExecutor
+ */
TableEventExecutor getTableEventExecutor(String fqTableName) {
Preconditions.checkNotNull(fqTableName);
return tableToEventExecutor_.get(fqTableName);
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
b/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
index cfedc8922..1de27ff96 100644
---
a/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
+++
b/fe/src/main/java/org/apache/impala/catalog/events/EventExecutorService.java
@@ -21,7 +21,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -43,7 +45,6 @@ import
org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventType;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreTableEvent;
import org.apache.impala.catalog.events.MetastoreEvents.PseudoAbortTxnEvent;
-import org.apache.impala.common.Pair;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.compat.MetastoreShim;
import org.slf4j.Logger;
@@ -578,4 +579,50 @@ public class EventExecutorService {
if (entry != null) greatestSyncedEventTime = entry.getValue();
return greatestSyncedEventTime;
}
+
+ /**
+ * Gets all the database names mapped to executors.
+ * @return Set of database names
+ */
+ Set<String> getDbNames() {
+ return new HashSet<>(dbNameToEventExecutor_.keySet());
+ }
+
+ /**
+ * Gets all table names mapped to executors for the given database.
+ * @param dbName Database name
+ * @return List of table names
+ */
+ List<String> getTableNames(String dbName) {
+ DbEventExecutor eventExecutor = getDbEventExecutor(dbName);
+ if (eventExecutor == null) return Collections.emptyList();
+ return eventExecutor.getTableNames(dbName);
+ }
+
+ /**
+ * Determines whether all events with event ids less than or equal to the
given event id
+ * have been processed for the given database.
+ * @param dbName Database name
+ * @param eventId Event id up to which events are expected to be processed
+ * @return True if all events up to the given event id are processed. False
otherwise
+ */
+ boolean isProcessed(String dbName, long eventId) {
+ DbEventExecutor eventExecutor = getDbEventExecutor(dbName);
+ if (eventExecutor == null) return true;
+ return eventExecutor.isProcessed(dbName, eventId);
+ }
+
+ /**
+ * Determines whether all events with event ids less than or equal to the
given event id
+ * have been processed for the given table.
+ * @param dbName Database name
+ * @param tableName Table name
+ * @param eventId Event id up to which events are expected to be processed
+ * @return True if all events up to the given event id are processed. False
otherwise
+ */
+ boolean isProcessed(String dbName, String tableName, long eventId) {
+ DbEventExecutor eventExecutor = getDbEventExecutor(dbName);
+ if (eventExecutor == null) return true;
+ return eventExecutor.isProcessed(dbName, tableName, eventId);
+ }
}
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index fa93f1d54..4838e9d80 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1049,7 +1049,8 @@ public class MetastoreEvents {
@Override
public String toString() {
- return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId_, eventType_);
+ return String.format(STR_FORMAT_EVENT_ID_TYPE, eventId_, eventType_,
+ getTargetName());
}
protected boolean isOlderThanLastSyncEventId(MetastoreEvent event) {
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index 46ce22cb9..700d34ab9 100644
---
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -950,6 +951,13 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
return lastSyncedEventId_.get();
}
+ /**
+ * Gets the last dispatched event id.
+ */
+ public long getLastDispatchedEventId() {
+ return getLastSyncedEventId();
+ }
+
/**
* Gets the current value of lastSyncedEventTimeSecs.
* @return
@@ -1908,6 +1916,15 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
res.addToError_msgs("Failed to fetch current HMS event id: " +
e.getMessage());
return res;
}
+ try {
+ if (req.should_expand_views) expandViews(req);
+ } catch (CatalogException | TException e) {
+ LOG.warn("Failed to expand views used by the query.", e);
+ }
+ if (isHierarchicalEventProcessingEnabled()) {
+ syncMetadataWithHierarchicalEventProcessing(req, res, latestEventId);
+ return res;
+ }
try {
waitForEventId = getMinEventIdToWaitFor(req);
} catch (Throwable e) {
@@ -1916,7 +1933,7 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
latestEventId, e);
waitForEventId = latestEventId;
}
- long lastSyncedEventId = getGreatestSyncedEventId();
+ long lastSyncedEventId = getLastSyncedEventId();
long startMs = System.currentTimeMillis();
long sleepIntervalMs = Math.min(timeoutMs, hmsEventSyncSleepIntervalMs_);
// Avoid too many log entries if the waiting interval is smaller than
500ms.
@@ -1929,7 +1946,7 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
lastSyncedEventId, waitForEventId);
}
Uninterruptibles.sleepUninterruptibly(sleepIntervalMs,
TimeUnit.MILLISECONDS);
- lastSyncedEventId = getGreatestSyncedEventId();
+ lastSyncedEventId = getLastSyncedEventId();
if (!canProcessEventInCurrentStatus()) {
res.setStatus_code(TErrorCode.GENERAL);
res.addToError_msgs(
@@ -1949,17 +1966,150 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
return res;
}
+ /**
+ * Method to synchronize all events up to the given latest event id for the
requested
+ * databases and tables.
+ * @param req HMS event sync request
+ * @param res Response
+ * @param latestEventId Event id up to which events are expected to be synced
+ */
+ void syncMetadataWithHierarchicalEventProcessing(TWaitForHmsEventRequest req,
+ TStatus res, long latestEventId) {
+ LOG.info("Synchronize events up to latest event ({})", latestEventId);
+ if (getGreatestSyncedEventId() >= latestEventId) {
+ LOG.info("Greatest synced event id ({}) already reached latest event
({})",
+ getGreatestSyncedEventId(), latestEventId);
+ res.setStatus_code(TErrorCode.OK);
+ return;
+ }
+ Set<String> dbNames = new HashSet<>();
+ Map<String, List<String>> db2Tables = new HashMap<>();
+ long timeoutMs = req.timeout_s * 1000L;
+ long sleepIntervalMs = Math.min(timeoutMs, hmsEventSyncSleepIntervalMs_);
+ long startMs = System.currentTimeMillis();
+ int numIters = 0;
+ int logIntervals = Math.max(1, 1000 / hmsEventSyncSleepIntervalMs_);
+ // Wait for all events up to the latest event are dispatched to
DbEventExecutors
+ while (getLastDispatchedEventId() < latestEventId
+ && System.currentTimeMillis() - startMs < timeoutMs) {
+ if (numIters++ % logIntervals == 0) {
+ LOG.info("Waiting for last dispatched event ({}) to reach latest event
({})",
+ getLastDispatchedEventId(), latestEventId);
+ }
+ Uninterruptibles.sleepUninterruptibly(sleepIntervalMs,
TimeUnit.MILLISECONDS);
+ }
+ if (req.want_db_list) {
+ // Get the list of dbs that have recently received/processed metastore
events at
+ // their respective DbProcessors and are not yet purged by idle
processor cleanup.
+ // We just make sure all pending events that could change the db list are
+ // processed. So we don't need all the db names from the catalog.
Coordinator just
+ // wants the changes on db list to make its cached db list up-to-date.
+ dbNames = eventExecutorService_.getDbNames();
+ } else if (req.isSetObject_descs()) {
+ // Get all db names and table names for each of db
+ getRequestedDbTableNames(req, dbNames, db2Tables);
+ }
+ if (!dbNames.isEmpty() && System.currentTimeMillis() - startMs <
timeoutMs) {
+ LOG.info("Databases to ensure synchronized: {}", Joiner.on(",
").join(dbNames));
+ }
+ if (req.want_table_list) {
+ numIters = 0;
+ // Wait for all events up to the latest event are dispatched to
TableEventExecutors
+ while (!dbNames.isEmpty() && System.currentTimeMillis() - startMs <
timeoutMs) {
+ Iterator<String> it = dbNames.iterator();
+ while (it.hasNext()) {
+ String dbName = it.next();
+ if (eventExecutorService_.isProcessed(dbName, latestEventId)) {
+ LOG.info("Synced database: {}", dbName);
+ it.remove();
+ // Gets the list of tables that have recently received/processed
metastore
+ // events for the db and are not yet purged by idle processor
cleanup.
+ List<String> tableNames =
eventExecutorService_.getTableNames(dbName);
+ if (!tableNames.isEmpty()) db2Tables.put(dbName, tableNames);
+ }
+ }
+ if (!dbNames.isEmpty()) {
+ if (numIters++ % logIntervals == 0) {
+ LOG.info("Waiting to finish sync on database processors");
+ }
+ Uninterruptibles.sleepUninterruptibly(sleepIntervalMs,
TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ // Wait till necessary dbs and tables are synced
+ if (!db2Tables.isEmpty() && System.currentTimeMillis() - startMs <
timeoutMs) {
+ LOG.info("Tables to ensure synchronized: {}",
+ Joiner.on(", ").withKeyValueSeparator(" database with tables ")
+ .join(db2Tables));
+ }
+ numIters = 0;
+ boolean isSyncInProgress = !dbNames.isEmpty() || !db2Tables.isEmpty();
+ while (isSyncInProgress && System.currentTimeMillis() - startMs <
timeoutMs) {
+ // Remove the synced dbs
+ dbNames.removeIf(dbName -> {
+ boolean isProcessed = eventExecutorService_.isProcessed(dbName,
latestEventId);
+ if (isProcessed) LOG.info("Synced database: {}", dbName);
+ return isProcessed;
+ });
+ // Remove the synced tables
+ Iterator<Map.Entry<String, List<String>>> it =
db2Tables.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, List<String>> entry = it.next();
+ entry.getValue().removeIf(tableName -> {
+ boolean isProcessed =
eventExecutorService_.isProcessed(entry.getKey(),
+ tableName, latestEventId);
+ if (isProcessed) LOG.info("Synced table: {}.{}", entry.getKey(),
tableName);
+ return isProcessed;
+ });
+ if (entry.getValue().isEmpty()) {
+ it.remove();
+ }
+ }
+ isSyncInProgress = !dbNames.isEmpty() || !db2Tables.isEmpty();
+ if (isSyncInProgress) {
+ if (numIters++ % logIntervals == 0) {
+ LOG.info("Waiting to finish sync on database/table processors");
+ }
+ Uninterruptibles.sleepUninterruptibly(sleepIntervalMs,
TimeUnit.MILLISECONDS);
+ }
+ }
+ // Time out before syncing dbs and tables
+ if (isSyncInProgress || System.currentTimeMillis() - startMs >= timeoutMs)
{
+ res.setStatus_code(TErrorCode.GENERAL);
+ StringBuilder error = new StringBuilder();
+ error.append(String.format("Timeout waiting for HMS events to be synced.
" +
+ "Event id to wait for: %d. Last synced event id: %d", latestEventId,
+ getGreatestSyncedEventId()));
+ if (!dbNames.isEmpty()) {
+ error.append(". Failed to sync databases: ").append(Joiner.on(", ")
+ .join(dbNames));
+ }
+ if (!db2Tables.isEmpty()) {
+ error.append(". Failed to sync tables: ")
+ .append(Joiner.on(", ").withKeyValueSeparator(" database with
tables ")
+ .join(db2Tables));
+ }
+ error.append(".");
+ res.addToError_msgs(error.toString());
+ return;
+ }
+ LOG.info("All required databases and tables are synced up to event ({})",
+ latestEventId);
+ res.setStatus_code(TErrorCode.OK);
+ }
+
/**
* Find the min required event id that should be synced to avoid the query
using
* stale metadata.
*/
private long getMinEventIdToWaitFor(TWaitForHmsEventRequest req)
- throws MetastoreNotificationFetchException, TException, CatalogException
{
- if (req.should_expand_views) expandViews(req);
+ throws MetastoreNotificationFetchException {
+ Preconditions.checkState(!isHierarchicalEventProcessingEnabled(),
+ "Hierarchical event processing is enabled");
// requiredEventId starts from the last synced event id. While checking
pending
// events of a target, we just fetch events after requiredEventId, since
events
// before it are decided to be waited for.
- long requiredEventId = getGreatestSyncedEventId();
+ long requiredEventId = getLastSyncedEventId();
if (req.want_db_list) {
return getMinRequiredEventIdForDbList(requiredEventId);
}
@@ -1969,19 +2119,7 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
// checked by one RPC.
Set<String> dbNames = new HashSet<>();
Map<String, List<String>> db2Tables = new HashMap<>();
- for (TCatalogObject catalogObject: req.getObject_descs()) {
- if (catalogObject.isSetDb()) {
- dbNames.add(catalogObject.getDb().db_name);
- } else if (catalogObject.isSetTable()) {
- TTable table = catalogObject.getTable();
- if (catalog_.getDb(table.db_name) == null) {
- // We will check existence of missing dbs.
- dbNames.add(table.db_name);
- }
- db2Tables.computeIfAbsent(table.db_name, k -> new ArrayList<>())
- .add(table.tbl_name);
- }
- }
+ getRequestedDbTableNames(req, dbNames, db2Tables);
// Step 2: Check DB events
requiredEventId = getMinRequiredEventIdForDb(requiredEventId, dbNames,
req.want_table_list);
@@ -1996,6 +2134,30 @@ public class MetastoreEventsProcessor implements
ExternalEventsProcessor {
return requiredEventId;
}
+ /**
+ * Populates the database names, table names and groups table names by the
db name for
+ * the given HMS event sync request.
+ * @param req HMS event sync request
+ * @param dbNames database names container
+ * @param db2Tables database name to table names mapping container
+ */
+ private void getRequestedDbTableNames(TWaitForHmsEventRequest req,
+ Set<String> dbNames, Map<String, List<String>> db2Tables) {
+ for (TCatalogObject catalogObject: req.getObject_descs()) {
+ if (catalogObject.isSetDb()) {
+ dbNames.add(catalogObject.getDb().db_name);
+ } else if (catalogObject.isSetTable()) {
+ TTable table = catalogObject.getTable();
+ if (catalog_.getDb(table.db_name) == null) {
+ // We will check existence of missing dbs.
+ dbNames.add(table.db_name);
+ }
+ db2Tables.computeIfAbsent(table.db_name, k -> new ArrayList<>())
+ .add(table.tbl_name);
+ }
+ }
+ }
+
private static void doForAllObjectNames(TWaitForHmsEventRequest req,
Function<TTableName, Object> tblFn, @Nullable Function<String, Object>
dbFn) {
for (TCatalogObject catalogObject : req.getObject_descs()) {
diff --git
a/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
b/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
index 12eb39f62..d3363a9fe 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/TableEventExecutor.java
@@ -212,6 +212,20 @@ public class TableEventExecutor {
BackendConfig.INSTANCE.getMinEventProcessorIdleMs();
}
+ /**
+ * Determines whether all events with event ids less than or equal to the
given event
+ * id have been processed for this table.
+ * @param eventId Event id up to which events are expected to be processed
+ * @return True if all events up to the given event id are processed.
False otherwise
+ */
+ private boolean isProcessed(long eventId) {
+ synchronized (processorLock_) {
+ // Return false if the TableProcessor is being removed.
+ if (isTerminating()) return false;
+ return events_.isEmpty() || events_.peek().getEventId() > eventId;
+ }
+ }
+
/**
* Determines whether TableProcessor is terminating.
* @return True if terminating. False otherwise
@@ -489,4 +503,17 @@ public class TableEventExecutor {
eventProcessor_.handleEventProcessException(e.getException(),
e.getEvent());
}
}
+
+ /**
+ * Determines whether all events with event ids less than or equal to the
given event id
+ * have been processed for the given table.
+ * @param fqTableName Fully qualified table name
+ * @param eventId Event id up to which events are expected to be processed
+ * @return True if all events up to the given event id are processed. False
otherwise
+ */
+ boolean isProcessed(String fqTableName, long eventId) {
+ TableProcessor tableProcessor = tableProcessors_.get(fqTableName);
+ if (tableProcessor == null) return true;
+ return tableProcessor.isProcessed(eventId);
+ }
}
diff --git
a/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
b/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
index 02b6cd218..7f0e24523 100644
---
a/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
+++
b/fe/src/test/java/org/apache/impala/catalog/events/EventExecutorServiceTest.java
@@ -27,13 +27,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -199,21 +199,27 @@ public class EventExecutorServiceTest {
do {
isNeedProcess = false;
for (DbEventExecutor dbExecutor :
eventExecutorService.getDbEventExecutors()) {
- if (dbExecutor.getOutstandingEventCount() != 0) {
- isNeedProcess = true;
- dbExecutor.process();
- }
- for (TableEventExecutor tableExecutor :
dbExecutor.getTableEventExecutors()) {
- if (tableExecutor.getOutstandingEventCount() != 0) {
- isNeedProcess = true;
- tableExecutor.process();
- }
- }
+ isNeedProcess |= process(dbExecutor);
}
} while (--maxLoopTimes > 0 && isNeedProcess);
assertTrue("Events should be processed within the limit", maxLoopTimes >
0);
}
+ private boolean process(DbEventExecutor dbExecutor) {
+ boolean isNeedProcess = false;
+ if (dbExecutor.getOutstandingEventCount() != 0) {
+ isNeedProcess = true;
+ dbExecutor.process();
+ }
+ for (TableEventExecutor tableExecutor :
dbExecutor.getTableEventExecutors()) {
+ if (tableExecutor.getOutstandingEventCount() != 0) {
+ isNeedProcess = true;
+ tableExecutor.process();
+ }
+ }
+ return isNeedProcess;
+ }
+
private EventExecutorService createEventExecutorService(int
numDbEventExecutor,
int numTableEventExecutor) {
EventExecutorService eventExecutorService = new
EventExecutorService(eventsProcessor_,
@@ -911,6 +917,63 @@ public class EventExecutorServiceTest {
// processed
assertEquals(lastEventId, eventExecutorService.getGreatestSyncedEventId());
assertEquals(0, eventExecutorService.getPendingEventCount(lastEventId));
+ eventExecutorService.shutdown(true);
}
+ /**
+ * Tests whether events prior to the latest event id are processed at the
executors
+ * @throws Exception
+ */
+ @Test
+ public void testEventsProcessedTillLatestEvent() throws Exception {
+ EventExecutorService eventExecutorService = new
EventExecutorService(eventsProcessor_,
+ 2, 2);
+
eventExecutorService.setStatus(EventExecutorService.EventExecutorStatus.ACTIVE);
+ String t1 = "t1";
+ String t2 = "t2";
+ String t3 = "t3";
+ String t4 = "t4";
+ createDatabase(DB_NAME1);
+ createDatabase(DB_NAME2);
+ createTable(DB_NAME1, t1);
+ createTransactionalTable(DB_NAME1, t2, false);
+ createTable(DB_NAME2, t3);
+ createTransactionalTable(DB_NAME2, t4, true);
+
+ long latestEventId = eventsProcessor_.getCurrentEventId();
+ List<MetastoreEvent> metastoreEvents = eventsProcessor_.getEventsFactory()
+ .getFilteredEvents(eventsProcessor_.getNextMetastoreEvents(),
+ eventsProcessor_.getMetrics());
+ for (MetastoreEvent event : metastoreEvents) {
+ eventExecutorService.dispatch(event);
+ }
+ DbEventExecutor dbExecutor1 =
eventExecutorService.getDbEventExecutors().get(0);
+ DbEventExecutor dbExecutor2 =
eventExecutorService.getDbEventExecutors().get(1);
+ assertEquals(new HashSet<>(Arrays.asList(DB_NAME1, DB_NAME2)),
+ eventExecutorService.getDbNames());
+ int maxLoopTimes = 2;
+ do {
+ assertFalse(eventExecutorService.isProcessed(DB_NAME1, latestEventId));
+ assertFalse(eventExecutorService.isProcessed(DB_NAME1, t1,
latestEventId));
+ assertFalse(eventExecutorService.isProcessed(DB_NAME1, t2,
latestEventId));
+ } while (process(dbExecutor1) && --maxLoopTimes > 0);
+ assertTrue(eventExecutorService.isProcessed(DB_NAME1, latestEventId));
+ assertTrue(eventExecutorService.isProcessed(DB_NAME1, t1, latestEventId));
+ assertTrue(eventExecutorService.isProcessed(DB_NAME1, t2, latestEventId));
+ assertEquals(new HashSet<>(Arrays.asList(t1, t2)),
+ new HashSet<>(eventExecutorService.getTableNames(DB_NAME1)));
+
+ maxLoopTimes = 2;
+ do {
+ assertFalse(eventExecutorService.isProcessed(DB_NAME2, latestEventId));
+ assertFalse(eventExecutorService.isProcessed(DB_NAME2, t3,
latestEventId));
+ assertFalse(eventExecutorService.isProcessed(DB_NAME2, t4,
latestEventId));
+ } while (process(dbExecutor2) && --maxLoopTimes > 0);
+ assertTrue(eventExecutorService.isProcessed(DB_NAME2, latestEventId));
+ assertTrue(eventExecutorService.isProcessed(DB_NAME2, t3, latestEventId));
+ assertTrue(eventExecutorService.isProcessed(DB_NAME2, t4, latestEventId));
+ assertEquals(new HashSet<>(Arrays.asList(t3, t4)),
+ new HashSet<>(eventExecutorService.getTableNames(DB_NAME2)));
+ eventExecutorService.shutdown(true);
+ }
}
diff --git a/tests/custom_cluster/test_events_custom_configs.py
b/tests/custom_cluster/test_events_custom_configs.py
index 55f7b2ea2..14d7eb1f9 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -27,6 +27,7 @@ from impala_thrift_gen.hive_metastore.ttypes import
FireEventRequest
from impala_thrift_gen.hive_metastore.ttypes import FireEventRequestData
from impala_thrift_gen.hive_metastore.ttypes import InsertEventRequestData
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.environ import ImpalaTestClusterProperties
from tests.common.impala_connection import ERROR, FINISHED
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.parametrize import UniqueDatabase
@@ -53,7 +54,7 @@ EVENT_SYNC_QUERY_OPTIONS = {
"sync_hms_events_wait_time_s": 10,
"sync_hms_events_strict_mode": True
}
-
+IMPALA_TEST_CLUSTER_PROPERTIES = ImpalaTestClusterProperties.get_instance()
def wait_statestore_heartbeat(num_heartbeat=1):
"""Wait for state sync across impalads."""
@@ -2051,6 +2052,9 @@ class
TestEventSyncFailures(TestEventProcessingCustomConfigsBase):
"Timeout waiting for HMS events to be synced. Event id to wait for:")
assert expected_error in client_log
assert ". Last synced event id: " in client_log
+ if
IMPALA_TEST_CLUSTER_PROPERTIES.is_hierarchical_event_processing_enabled():
+ assert ". Failed to sync tables: {} database with tables [part].".format(
+ unique_database) in client_log
profile = client.get_runtime_profile(handle)
assert "Errors: " + expected_error in profile, profile
self.verify_timeline_item("Query Compilation", label, profile)