This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch branch-4.4.0 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0767d656ef00a381441fdcc3ebb3f146fb0d179c Author: stiga-huang <[email protected]> AuthorDate: Fri Mar 22 21:23:34 2024 +0800 IMPALA-12933: Avoid fetching unneccessary events of unwanted types There are several places where catalogd will fetch all events of a specific type on a table. E.g. in TableLoader#load(), if the table has an old createEventId, catalogd will fetch all CREATE_TABLE events after that createEventId on the table. Fetching the list of events is expensive since the filtering is done on client side, i.e. catalogd fetches all events and filter them locally based on the event type and table name. This could take hours if there are lots of events (e.g 1M) in HMS. This patch sets the eventTypeSkipList with the complement set of the wanted type. So the get_next_notification RPC can filter out some events on HMS side. To avoid bringing too much computation overhead to HMS's underlying RDBMS in evaluating predicates of EVENT_TYPE != 'xxx', rare event types (e.g. DROP_ISCHEMA) are not added in the list. A new flag, common_hms_event_types, is added to specify the common HMS event types. Once HIVE-28146 is resolved, we can set the wanted types directly in the HMS RPC and this approach can be simplified. UPDATE_TBL_COL_STAT_EVENT, UPDATE_PART_COL_STAT_EVENT are the most common unused events for Impala. They are also added to the default skip list. A new flag, default_skipped_hms_event_types, is added to configure this list. This patch also fixes an issue that events of the non-default catalog are not filtered out. In a local perf test, I generated 100K RELOAD events after creating a table in Hive. Then use the table in Impala to trigger metadata loading on it which will fetch the latest CREATE_TABLE event by polling all events after the last known CREATE_TABLE event. Before this patch, fetching the events takes 1s779ms. Now it takes only 395.377ms. Note that in prod env, the event messages are usually larger, we could have a larger speedup. Tests: - Added an FE test - Ran CORE tests Change-Id: Ieabe714328aa2cc605cb62b85ae8aa4bd537dbe9 Reviewed-on: http://gerrit.cloudera.org:8080/21186 Reviewed-by: Csaba Ringhofer <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/catalog/catalog-server.cc | 24 ++++ be/src/util/backend-gflag-util.cc | 4 + common/thrift/BackendGflags.thrift | 4 + .../org/apache/impala/compat/MetastoreShim.java | 4 +- .../org/apache/impala/compat/MetastoreShim.java | 8 +- .../impala/catalog/CatalogServiceCatalog.java | 32 ++++- .../impala/catalog/Hive3MetastoreShimBase.java | 11 +- .../org/apache/impala/catalog/TableLoader.java | 9 +- .../impala/catalog/events/MetastoreEvents.java | 16 +-- .../catalog/events/MetastoreEventsProcessor.java | 95 ++++++++++++--- .../metastore/CatalogMetastoreServiceHandler.java | 28 ++--- .../catalog/metastore/MetastoreServiceHandler.java | 20 +-- .../org/apache/impala/service/BackendConfig.java | 8 ++ .../apache/impala/service/CatalogOpExecutor.java | 134 +++++++++++---------- .../events/MetastoreEventsProcessorTest.java | 73 +++++++++-- 15 files changed, 319 insertions(+), 151 deletions(-) diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc index 9f4d2213e..5f318278a 100644 --- a/be/src/catalog/catalog-server.cc +++ b/be/src/catalog/catalog-server.cc @@ -190,6 +190,30 @@ DEFINE_double_hidden(inject_process_event_failure_ratio, 1.0, "event failure. If the generated random number is lesser than this value, then we" "fail the event processor(EP)."); +DEFINE_string(default_skipped_hms_event_types, + "OPEN_TXN,UPDATE_TBL_COL_STAT_EVENT,UPDATE_PART_COL_STAT_EVENT", + "HMS event types that are not used by Impala. They are skipped by default in " + "fetching HMS event batches. Only in few places they will be fetched, e.g. fetching " + "the latest event time in HMS."); +DEFINE_string(common_hms_event_types, "ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION," + "ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,CREATE_TABLE,ALTER_TABLE,DROP_TABLE," + "CREATE_DATABASE,ALTER_DATABASE,DROP_DATABASE,INSERT,OPEN_TXN,COMMIT_TXN,ABORT_TXN," + "ALLOC_WRITE_ID_EVENT,ACID_WRITE_EVENT,BATCH_ACID_WRITE_EVENT," + "UPDATE_TBL_COL_STAT_EVENT,DELETE_TBL_COL_STAT_EVENT,UPDATE_PART_COL_STAT_EVENT," + "UPDATE_PART_COL_STAT_EVENT_BATCH,DELETE_PART_COL_STAT_EVENT,COMMIT_COMPACTION_EVENT," + "RELOAD", + "Common HMS event types that will be used in eventTypeSkipList when fetching events " + "from HMS. The strings come from constants in " + "org.apache.hadoop.hive.metastore.messaging.MessageBuilder. When bumping Hive " + "versions, the list might need to be updated accordingly. To avoid bringing too much " + "computation overhead to HMS's underlying RDBMS in evaluating predicates of " + "EVENT_TYPE != 'xxx', rare event types are not tracked in this list. They are " + "CREATE_FUNCTION,DROP_FUNCTION,ADD_PRIMARYKEY,ADD_FOREIGNKEY,ADD_UNIQUECONSTRAINT," + "ADD_NOTNULLCONSTRAINT, ADD_DEFAULTCONSTRAINT, ADD_CHECKCONSTRAINT, DROP_CONSTRAINT," + "CREATE_ISCHEMA, ALTER_ISCHEMA, DROP_ISCHEMA, ADD_SCHEMA_VERSION," + "ALTER_SCHEMA_VERSION, DROP_SCHEMA_VERSION, CREATE_CATALOG, ALTER_CATALOG," + "DROP_CATALOG, CREATE_DATACONNECTOR, ALTER_DATACONNECTOR, DROP_DATACONNECTOR."); + DECLARE_string(state_store_host); DECLARE_int32(state_store_port); DECLARE_string(state_store_2_host); diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index 6ae72e671..18bd639a8 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -123,6 +123,8 @@ DECLARE_string(inject_process_event_failure_event_types); DECLARE_double(inject_process_event_failure_ratio); DECLARE_bool(enable_workload_mgmt); DECLARE_string(query_log_table_name); +DECLARE_string(default_skipped_hms_event_types); +DECLARE_string(common_hms_event_types); // HS2 SAML2.0 configuration // Defined here because TAG_FLAG caused issues in global-flags.cc @@ -476,6 +478,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) { cfg.__set_enable_workload_mgmt(FLAGS_enable_workload_mgmt); cfg.__set_query_log_table_name(FLAGS_query_log_table_name); cfg.__set_query_cpu_root_factor(FLAGS_query_cpu_root_factor); + cfg.__set_default_skipped_hms_event_types(FLAGS_default_skipped_hms_event_types); + cfg.__set_common_hms_event_types(FLAGS_common_hms_event_types); return Status::OK(); } diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index e0c89b3bd..285316a3c 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -294,4 +294,8 @@ struct TBackendGflags { 131: required string query_log_table_name 132: required double query_cpu_root_factor + + 133: required string default_skipped_hms_event_types + + 134: required string common_hms_event_types } diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java index 5137beee9..c34d2d8af 100644 --- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -461,12 +461,12 @@ public class MetastoreShim extends Hive3MetastoreShimBase { * @see org.apache.hadoop.hive.metastore.HiveMetaStoreClient#getNextNotification * @param msClient Metastore client * @param eventRequest Notification event request - * @param isSkipUnwantedEventTypes Whether to set skip event types in request + * @param eventTypeSkipList unused * @return NotificationEventResponse * @throws TException */ public static NotificationEventResponse getNextNotification(IMetaStoreClient msClient, - NotificationEventRequest eventRequest, boolean isSkipUnwantedEventTypes) + NotificationEventRequest eventRequest, List<String> eventTypeSkipList) throws TException { return getThriftClient(msClient).get_next_notification(eventRequest); } diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index 9ecad6284..f5354c0a4 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -597,15 +597,15 @@ public class MetastoreShim extends Hive3MetastoreShimBase { * * @param msClient Metastore client * @param eventRequest Notification event request - * @param isSkipUnwantedEventTypes Whether to set skip event types in request + * @param eventTypeSkipList Unwanted event types * @return NotificationEventResponse * @throws TException */ public static NotificationEventResponse getNextNotification(IMetaStoreClient msClient, - NotificationEventRequest eventRequest, boolean isSkipUnwantedEventTypes) + NotificationEventRequest eventRequest, List<String> eventTypeSkipList) throws TException { - if (isSkipUnwantedEventTypes) { - eventRequest.setEventTypeSkipList(MetastoreEventsProcessor.getEventSkipList()); + if (eventTypeSkipList != null) { + eventRequest.setEventTypeSkipList(eventTypeSkipList); } return msClient.getThriftClient().get_next_notification(eventRequest); } diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 689bb233b..87a7d59f8 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -349,6 +349,16 @@ public class CatalogServiceCatalog extends Catalog { // A variable to test expected failed events private final Set<String> failureEventsForTesting_; + // List of event types to skip by default while fetching notification events from + // metastore. + private final List<String> defaultSkippedHmsEventTypes_; + + // List of common known HMS event types. Ignores those that are rare in regular (e.g. + // daily) jobs. The list is used to generate a complement set for wanted HMS event + // types. We need this list until HIVE-28146 is resolved. After that we can directly + // specify what event types we want. + private final List<String> commonHmsEventTypes_; + // Total number of dbs, tables and functions in the catalog cache. // Updated in each catalog topic update (getCatalogDelta()). private int numDbs_ = 0; @@ -415,6 +425,21 @@ public class CatalogServiceCatalog extends Catalog { Splitter.on(',').trimResults().omitEmptyStrings().split(failureEvents)) { failureEventsForTesting_.add(tblProps); } + defaultSkippedHmsEventTypes_ = Lists.newArrayList(); + Iterable<String> eventTypes = Splitter.on(',') + .trimResults().omitEmptyStrings() + .split(BackendConfig.INSTANCE.getDefaultSkippedHmsEventTypes()); + for (String eventType : eventTypes) { + defaultSkippedHmsEventTypes_.add(eventType); + } + LOG.info("Default skipped HMS event types: " + defaultSkippedHmsEventTypes_); + commonHmsEventTypes_ = Lists.newArrayList(); + eventTypes = Splitter.on(',').trimResults().omitEmptyStrings() + .split(BackendConfig.INSTANCE.getCommonHmsEventTypes()); + for (String eventType : eventTypes) { + commonHmsEventTypes_.add(eventType); + } + LOG.info("Common HMS event types: " + commonHmsEventTypes_); } public void startEventsProcessor() { @@ -4104,7 +4129,6 @@ public class CatalogServiceCatalog extends Catalog { this.metastoreEventProcessor_ = metastoreEventProcessor; } - @VisibleForTesting public void setCatalogMetastoreServer(ICatalogMetastoreServer catalogMetastoreServer) { this.catalogMetastoreServer_ = catalogMetastoreServer; } @@ -4141,4 +4165,10 @@ public class CatalogServiceCatalog extends Catalog { } public Set<String> getFailureEventsForTesting() { return failureEventsForTesting_; } + + public List<String> getDefaultSkippedHmsEventTypes() { + return defaultSkippedHmsEventTypes_; + } + + public List<String> getCommonHmsEventTypes() { return commonHmsEventTypes_; } } diff --git a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java index cdd72aad7..9fb6b7f37 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java +++ b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java @@ -298,14 +298,21 @@ public class Hive3MetastoreShimBase { // hive-3 introduces a catalog object in hive // Impala only supports the default catalog of hive - private static final String defaultCatalogName_ = MetaStoreUtils + private static final String DEFAULT_CATALOG_NAME = MetaStoreUtils .getDefaultCatalog(MetastoreConf.newMetastoreConf()); /** * Gets the name of the default catalog from metastore configuration. */ public static String getDefaultCatalogName() { - return defaultCatalogName_; + return DEFAULT_CATALOG_NAME; + } + + /** + * Returns whether the catalog name is the default catalog. + */ + public static boolean isDefaultCatalog(String catalogName) { + return DEFAULT_CATALOG_NAME.equalsIgnoreCase(catalogName); } /** diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java index 79b388cb5..a30313f15 100644 --- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java @@ -24,8 +24,6 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.impala.catalog.events.EventFactory; -import org.apache.impala.catalog.events.MetastoreEvents; import org.apache.impala.catalog.events.MetastoreEvents.CreateTableEvent; import org.apache.impala.catalog.events.MetastoreEventsProcessor; import org.apache.impala.common.Metrics; @@ -101,11 +99,8 @@ public class TableLoader { // we are only interested in fetching the events if we have a valid eventId // for a table. For tables where eventId is unknown are not created by // this catalogd and hence the self-event detection logic does not apply. - events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_, - eventId, notificationEvent -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE - .equals(notificationEvent.getEventType()) - && notificationEvent.getDbName().equalsIgnoreCase(db.getName()) - && notificationEvent.getTableName().equalsIgnoreCase(tblName)); + events = MetastoreEventsProcessor.getNextMetastoreEventsInBatchesForTable( + catalog_, eventId, db.getName(), tblName, CreateTableEvent.EVENT_TYPE); catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH); } if (events != null && !events.isEmpty()) { diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index bf861957f..726455923 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -83,7 +83,6 @@ import org.apache.impala.thrift.TPartitionKeyValue; import org.apache.impala.thrift.TTableName; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.DebugUtils; -import org.apache.impala.util.EventSequence; import org.apache.impala.util.MetaStoreUtil; import org.apache.impala.util.NoOpEventSequence; import org.slf4j.LoggerFactory; @@ -279,8 +278,7 @@ public class MetastoreEvents { String catalogName = currentEvent.getCatalogName(); String eventDb = currentEvent.getDbName(); String eventTbl = currentEvent.getTableName(); - if (catalogName != null && !MetastoreShim.getDefaultCatalogName() - .equalsIgnoreCase(catalogName)) { + if (catalogName != null && !MetastoreShim.isDefaultCatalog(catalogName)) { // currently Impala doesn't support custom hive catalogs and hence we should // ignore all the events which are on non-default catalog namespaces. LOG.debug(currentEvent.debugString( @@ -1405,7 +1403,7 @@ public class MetastoreEvents { */ public static class CreateTableEvent extends MetastoreTableEvent { - public static final String CREATE_TABLE_EVENT_TYPE = "CREATE_TABLE"; + public static final String EVENT_TYPE = "CREATE_TABLE"; /** * Prevent instantiation from outside should use MetastoreEventFactory instead */ @@ -1662,6 +1660,7 @@ public class MetastoreEvents { * MetastoreEvent for ALTER_TABLE event type */ public static class AlterTableEvent extends MetastoreTableEvent { + public static final String EVENT_TYPE = "ALTER_TABLE"; protected org.apache.hadoop.hive.metastore.api.Table tableBefore_; // the table object after alter operation, as parsed from the NotificationEvent protected org.apache.hadoop.hive.metastore.api.Table tableAfter_; @@ -2074,7 +2073,7 @@ public class MetastoreEvents { */ public static class DropTableEvent extends MetastoreTableEvent { - public static final String DROP_TABLE_EVENT_TYPE = "DROP_TABLE"; + public static final String EVENT_TYPE = "DROP_TABLE"; /** * Prevent instantiation from outside should use MetastoreEventFactory instead @@ -2145,7 +2144,7 @@ public class MetastoreEvents { */ public static class CreateDatabaseEvent extends MetastoreDatabaseEvent { - public static final String CREATE_DATABASE_EVENT_TYPE = "CREATE_DATABASE"; + public static final String EVENT_TYPE = "CREATE_DATABASE"; // metastore database object as parsed from NotificationEvent message private final Database createdDatabase_; @@ -2294,7 +2293,7 @@ public class MetastoreEvents { */ public static class DropDatabaseEvent extends MetastoreDatabaseEvent { - public static final String DROP_DATABASE_EVENT_TYPE = "DROP_DATABASE"; + public static final String EVENT_TYPE = "DROP_DATABASE"; // Metastore database object as parsed from NotificationEvent message private final Database droppedDatabase_; /** @@ -2395,7 +2394,7 @@ public class MetastoreEvents { } public static class AddPartitionEvent extends MetastoreTableEvent { - public static final String ADD_PARTITION_EVENT_TYPE = "ADD_PARTITION"; + public static final String EVENT_TYPE = "ADD_PARTITION"; private final List<Partition> addedPartitions_; private final List<List<TPartitionKeyValue>> partitionKeyVals_; @@ -2501,6 +2500,7 @@ public class MetastoreEvents { } public static class AlterPartitionEvent extends MetastoreTableEvent { + public static final String EVENT_TYPE = "ALTER_PARTITION"; // the Partition object before alter operation, as parsed from the NotificationEvent private final org.apache.hadoop.hive.metastore.api.Partition partitionBefore_; // the Partition object after alter operation, as parsed from the NotificationEvent diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java index e6a683a3c..8dabf8be2 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java @@ -22,6 +22,7 @@ import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.time.LocalDateTime; import java.util.ArrayList; @@ -54,6 +55,7 @@ import org.apache.impala.catalog.Table; import org.apache.impala.catalog.IncompleteTable; import org.apache.impala.catalog.events.ConfigValidator.ValidationResult; import org.apache.impala.catalog.events.MetastoreEvents.DropDatabaseEvent; +import org.apache.impala.catalog.events.MetastoreEvents.DropTableEvent; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory; import org.apache.impala.common.Metrics; @@ -277,19 +279,54 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { private static final long SECOND_IN_NANOS = 1000 * 1000 * 1000L; - // List of event types to skip while fetching notification events from metastore - private static final List<String> EVENT_SKIP_LIST = Arrays.asList("OPEN_TXN"); + public static List<NotificationEvent> getNextMetastoreEventsInBatchesForDb( + CatalogServiceCatalog catalog, long eventId, String dbName, String eventType) + throws MetastoreNotificationException { + return getNextMetastoreEventsInBatchesForDb(catalog, eventId, + MetastoreShim.getDefaultCatalogName(), dbName, eventType); + } + + public static List<NotificationEvent> getNextMetastoreEventsInBatchesForDb( + CatalogServiceCatalog catalog, long eventId, String catName, + String dbName, String eventType) throws MetastoreNotificationException { + Preconditions.checkNotNull(eventType, "eventType is null in fetching db events"); + Preconditions.checkNotNull(catName, "catName is null in fetching db events"); + Preconditions.checkNotNull(dbName, "dbName is null in fetching db events"); + NotificationFilter filter = notificationEvent -> + eventType.equals(notificationEvent.getEventType()) + && catName.equalsIgnoreCase(notificationEvent.getCatName()) + && dbName.equalsIgnoreCase(notificationEvent.getDbName()); + return getNextMetastoreEventsInBatches(catalog, eventId, filter, eventType); + } + + public static List<NotificationEvent> getNextMetastoreEventsInBatchesForTable( + CatalogServiceCatalog catalog, long eventId, String dbName, String tblName, + String eventType) throws MetastoreNotificationException { + return getNextMetastoreEventsInBatchesForTable(catalog, eventId, + MetastoreShim.getDefaultCatalogName(), dbName, tblName, eventType); + } + + public static List<NotificationEvent> getNextMetastoreEventsInBatchesForTable( + CatalogServiceCatalog catalog, long eventId, String catName, String dbName, + String tblName, String eventType) throws MetastoreNotificationException { + Preconditions.checkNotNull(eventType, "eventType is null in fetching table events"); + Preconditions.checkNotNull(catName, "catName is null in fetching table events"); + Preconditions.checkNotNull(dbName, "dbName is null in fetching table events"); + Preconditions.checkNotNull(tblName, "tblName is null in fetching table events"); + NotificationFilter filter = notificationEvent -> + eventType.equals(notificationEvent.getEventType()) + && catName.equalsIgnoreCase(notificationEvent.getCatName()) + && dbName.equalsIgnoreCase(notificationEvent.getDbName()) + && tblName.equalsIgnoreCase(notificationEvent.getTableName()); + return getNextMetastoreEventsInBatches(catalog, eventId, filter, + EVENTS_BATCH_SIZE_PER_RPC, eventType); + } - /** - * Wrapper around {@link - * MetastoreEventsProcessor#getNextMetastoreEventsInBatches(CatalogServiceCatalog, - * long, NotificationFilter, int)} which passes the default batch size. - */ public static List<NotificationEvent> getNextMetastoreEventsInBatches( - CatalogServiceCatalog catalog, long eventId, NotificationFilter filter) - throws MetastoreNotificationFetchException { + CatalogServiceCatalog catalog, long eventId, NotificationFilter filter, + String... eventTypes) throws MetastoreNotificationFetchException { return getNextMetastoreEventsInBatches(catalog, eventId, filter, - EVENTS_BATCH_SIZE_PER_RPC); + EVENTS_BATCH_SIZE_PER_RPC, eventTypes); } /** @@ -309,7 +346,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { @VisibleForTesting public static List<NotificationEvent> getNextMetastoreEventsInBatches( CatalogServiceCatalog catalog, long eventId, NotificationFilter filter, - int eventsBatchSize) throws MetastoreNotificationFetchException { + int eventsBatchSize, String... eventTypes) + throws MetastoreNotificationFetchException { Preconditions.checkArgument(eventsBatchSize > 0); List<NotificationEvent> result = new ArrayList<>(); try (MetaStoreClient msc = catalog.getMetaStoreClient()) { @@ -317,6 +355,17 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { .getEventId(); if (toEventId <= eventId) return result; long currentEventId = eventId; + List<String> eventTypeSkipList = catalog.getDefaultSkippedHmsEventTypes(); + String typeStr = null; + if (eventTypes != null && eventTypes.length > 0) { + eventTypeSkipList = Lists.newArrayList(catalog.getCommonHmsEventTypes()); + eventTypeSkipList.removeIf(s -> Arrays.asList(eventTypes).contains(s)); + typeStr = String.join(",", eventTypes) + " "; + } + LOG.info("Fetching {}events started from id {} to {}. Gap: {}", + typeStr == null ? "" : typeStr, eventId, toEventId, + toEventId - eventId); + int numFilteredEvents = 0; while (currentEventId < toEventId) { int batchSize = Math .min(eventsBatchSize, (int)(toEventId - currentEventId)); @@ -328,17 +377,28 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { eventRequest.setMaxEvents(batchSize); eventRequest.setLastEvent(currentEventId); NotificationEventResponse notificationEventResponse = - MetastoreShim.getNextNotification(msc.getHiveClient(), eventRequest, true); + MetastoreShim.getNextNotification(msc.getHiveClient(), eventRequest, + eventTypeSkipList); if (notificationEventResponse.getEvents().isEmpty()) { // Possible to receive empty list due to event skip list in request - return result; + break; } + for (NotificationEvent event : notificationEventResponse.getEvents()) { // if no filter is provided we add all the events - if (filter == null || filter.accept(event)) result.add(event); + if (filter == null || filter.accept(event)) { + result.add(event); + } else { + numFilteredEvents++; + } currentEventId = event.getEventId(); } } + if (numFilteredEvents > 0) { + LOG.info("Got {} events and filtered out {} locally from {} events start " + + "from id {}", + result.size(), numFilteredEvents, toEventId - eventId, eventId + 1); + } return result; } catch (MetastoreClientInstantiationException | TException e) { throw new MetastoreNotificationFetchException(String.format( @@ -785,7 +845,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { eventRequest.setMaxEvents(1); try { NotificationEventResponse response = MetastoreShim.getNextNotification( - msClient.getHiveClient(), eventRequest, false); + msClient.getHiveClient(), eventRequest, null); Iterator<NotificationEvent> eventIter = response.getEventsIterator(); if (!eventIter.hasNext()) { LOG.warn("Unable to fetch event {}. It has been cleaned up", eventId); @@ -917,7 +977,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { eventRequest.setLastEvent(eventId); eventRequest.setMaxEvents(batchSize); NotificationEventResponse response = - MetastoreShim.getNextNotification(msClient.getHiveClient(), eventRequest, true); + MetastoreShim.getNextNotification(msClient.getHiveClient(), eventRequest, + catalog_.getDefaultSkippedHmsEventTypes()); LOG.info(String.format("Received %d events. Start event id : %d", response.getEvents().size(), eventId)); if (filter == null) return response.getEvents(); @@ -1373,6 +1434,4 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor { public static MessageDeserializer getMessageDeserializer() { return MESSAGE_DESERIALIZER; } - - public static List<String> getEventSkipList() { return EVENT_SKIP_LIST; } } diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java index a5b2114c7..8df9673ff 100644 --- a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java +++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java @@ -214,13 +214,9 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler { throw e; } } - List<NotificationEvent> events = - MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_, fromEventId, - notificationEvent -> - MetastoreEvents.CreateDatabaseEvent.CREATE_DATABASE_EVENT_TYPE - .equals(notificationEvent.getEventType()) - && dbName.equalsIgnoreCase(notificationEvent.getDbName())); - + List<NotificationEvent> events = MetastoreEventsProcessor + .getNextMetastoreEventsInBatchesForDb(catalog_, fromEventId, dbName, + MetastoreEvents.CreateDatabaseEvent.EVENT_TYPE); Preconditions.checkArgument(events.size() == 1, "Db %s was recreated in metastore " + "while the current db creation was in progress", dbName); @@ -1330,12 +1326,11 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler { T resp = task.execute(); // Rename scenario, remove old table and add new one try { + // the alter table event is generated on the renamed table List<NotificationEvent> events = - MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_, - currentEventId, event -> "ALTER_TABLE".equals(event.getEventType()) - // the alter table event is generated on the renamed table - && newTable.getDbName().equalsIgnoreCase(event.getDbName()) - && newTable.getTableName().equalsIgnoreCase(event.getTableName())); + MetastoreEventsProcessor.getNextMetastoreEventsInBatchesForTable(catalog_, + currentEventId, newTable.getDbName(), newTable.getTableName(), + MetastoreEvents.AlterTableEvent.EVENT_TYPE); Preconditions.checkState(events.size() == 1, String.format("For table %s.%s, " + "from event id: %s, expected ALTER_TABLE events size to be 1 but is %s", newTable.getDbName(), newTable.getTableName(), currentEventId, @@ -1418,13 +1413,8 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler { "exception {} from metastore", dbName, tblName, e.getClass().getName()); } List<NotificationEvent> events = - MetastoreEventsProcessor - .getNextMetastoreEventsInBatches(catalog_, fromEventId, - event -> MetastoreEvents.CreateTableEvent.CREATE_TABLE_EVENT_TYPE - .equals(event.getEventType()) - && dbName.equalsIgnoreCase(event.getDbName()) - && tblName.equalsIgnoreCase(event.getTableName())); - + MetastoreEventsProcessor.getNextMetastoreEventsInBatchesForTable(catalog_, + fromEventId, dbName, tblName, MetastoreEvents.CreateTableEvent.EVENT_TYPE); Preconditions.checkState(events.size() == 1, "Table %s.%s was recreated in metastore since event id %s" + "while the current table creation was in progress", dbName, tblName, diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java index 859d71c49..2076457d3 100644 --- a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java +++ b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java @@ -152,7 +152,6 @@ import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.ListPackageRequest; -import org.apache.hadoop.hive.metastore.api.ListStoredProcedureRequest; import org.apache.hadoop.hive.metastore.api.LockRequest; import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.MapSchemaVersionToSerdeRequest; @@ -290,8 +289,6 @@ import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.events.EventFactory; import org.apache.impala.catalog.events.MetastoreEvents; import org.apache.impala.catalog.events.MetastoreEvents.DropTableEvent; -import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent; -import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory; import org.apache.impala.catalog.events.MetastoreEventsProcessor; import org.apache.impala.common.Metrics; import org.apache.impala.common.Reference; @@ -3198,12 +3195,8 @@ public abstract class MetastoreServiceHandler extends AbstractThriftHiveMetastor dbName = catAndDbName[1]; try { List<NotificationEvent> events = MetastoreEventsProcessor - .getNextMetastoreEventsInBatches(catalog_, beforeDropEventId, - event -> event.getEventType() - .equalsIgnoreCase(DropTableEvent.DROP_TABLE_EVENT_TYPE) - && catName.equalsIgnoreCase(event.getCatName()) - && dbName.equalsIgnoreCase(event.getDbName()) - && tableName.equalsIgnoreCase(event.getTableName())); + .getNextMetastoreEventsInBatchesForTable(catalog_, beforeDropEventId, + catName, dbName, tableName, DropTableEvent.EVENT_TYPE); if (events.isEmpty()) { throw new MetaException( "Drop table event not received. Check if notification events are " @@ -3294,13 +3287,8 @@ public abstract class MetastoreServiceHandler extends AbstractThriftHiveMetastor try { List<NotificationEvent> events = MetastoreEventsProcessor - .getNextMetastoreEventsInBatches(catalog_, beforeDropEventId, - event -> event.getEventType() - .equalsIgnoreCase(MetastoreEvents.DropDatabaseEvent - .DROP_DATABASE_EVENT_TYPE) - && catName.equalsIgnoreCase(event.getCatName()) - && dbName.equalsIgnoreCase(event.getDbName())); - + .getNextMetastoreEventsInBatchesForDb(catalog_, beforeDropEventId, catName, + dbName, MetastoreEvents.DropDatabaseEvent.EVENT_TYPE); if (events.size() == 0) { if (ignoreUnknownDb) { LOG.debug("db {} does not exist in metastore. Removing it from catalog if " diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index ed5a5fb90..2add99a2f 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -498,4 +498,12 @@ public class BackendConfig { } public double getQueryCpuRootFactor() { return backendCfg_.query_cpu_root_factor; } + + public String getDefaultSkippedHmsEventTypes() { + return backendCfg_.default_skipped_hms_event_types; + } + + public String getCommonHmsEventTypes() { + return backendCfg_.common_hms_event_types; + } } diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 85722ffe7..05043b99f 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -2128,12 +2128,8 @@ public class CatalogOpExecutor { long eventId = getCurrentEventId(msClient, catalogTimeline); msClient.getHiveClient().createDatabase(db); catalogTimeline.markEvent("Created database in Metastore"); - List<NotificationEvent> events = getNextMetastoreEventsIfEnabled( - catalogTimeline, eventId, - notificationEvent -> - CreateDatabaseEvent.CREATE_DATABASE_EVENT_TYPE - .equals(notificationEvent.getEventType()) - && dbName.equalsIgnoreCase(notificationEvent.getDbName())); + List<NotificationEvent> events = getNextMetastoreEventsForDbIfEnabled( + catalogTimeline, eventId, dbName, CreateDatabaseEvent.EVENT_TYPE); Pair<Long, Database> eventDbPair = getDatabaseFromEvents(events, params.if_not_exists); if (eventDbPair == null) { @@ -2196,16 +2192,53 @@ public class CatalogOpExecutor { /** * Wrapper around - * {@code MetastoreEventsProcessor#getNextMetastoreEventsInBatches} with the + * {@code MetastoreEventsProcessor#getNextMetastoreEventsInBatchesForTable} with the * addition that it checks if events processing is active or not. If not active, - * returns an empty list. Also updates the given 'catalogTimeline' + * returns an empty list. Also updates the given 'catalogTimeline'. */ - private List<NotificationEvent> getNextMetastoreEventsIfEnabled( - EventSequence catalogTimeline, long eventId, NotificationFilter eventsFilter) + private List<NotificationEvent> getNextMetastoreEventsForTableIfEnabled( + EventSequence catalogTimeline, long eventId, String dbName, String tblName, + String eventType) throws MetastoreNotificationException { + if (!catalog_.isEventProcessingActive()) return Collections.emptyList(); + List<NotificationEvent> events = MetastoreEventsProcessor + .getNextMetastoreEventsInBatchesForTable(catalog_, eventId, dbName, tblName, + eventType); + catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH); + return events; + } + + /** + * Wrapper around + * {@code MetastoreEventsProcessor#getNextMetastoreEventsInBatchesForDb} with the + * addition that it checks if events processing is active or not. If not active, + * returns an empty list. Also updates the given 'catalogTimeline'. + */ + private List<NotificationEvent> getNextMetastoreEventsForDbIfEnabled( + EventSequence catalogTimeline, long eventId, String dbName, String eventType) throws MetastoreNotificationException { if (!catalog_.isEventProcessingActive()) return Collections.emptyList(); List<NotificationEvent> events = MetastoreEventsProcessor - .getNextMetastoreEventsInBatches(catalog_, eventId, eventsFilter); + .getNextMetastoreEventsInBatchesForDb(catalog_, eventId, dbName, eventType); + catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH); + return events; + } + + /** + * Fetches CreateDatabase and CreateTable events of a db if events processing is active. + * Returns an empty list if not active. Also updates the given 'catalogTimeline'. + */ + private List<NotificationEvent> getNextMetastoreDropEventsForDbIfEnabled( + EventSequence catalogTimeline, long eventId, String dbName) + throws MetastoreNotificationException { + if (!catalog_.isEventProcessingActive()) return Collections.emptyList(); + List<String> eventTypes = Lists.newArrayList( + DropDatabaseEvent.EVENT_TYPE, DropTableEvent.EVENT_TYPE); + NotificationFilter filter = e -> dbName.equalsIgnoreCase(e.getDbName()) + && MetastoreShim.isDefaultCatalog(e.getCatName()) + && eventTypes.contains(e.getEventType()); + List<NotificationEvent> events = MetastoreEventsProcessor + .getNextMetastoreEventsInBatches(catalog_, eventId, filter, + DropDatabaseEvent.EVENT_TYPE, DropTableEvent.EVENT_TYPE); catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH); return events; } @@ -2236,7 +2269,9 @@ public class CatalogOpExecutor { MetastoreEvent event = catalog_ .getMetastoreEventProcessor().getEventsFactory() .get(events.get(events.size() - 1), null); - Preconditions.checkState(event instanceof CreateDatabaseEvent); + Preconditions.checkState(event instanceof CreateDatabaseEvent, + "Expects CreateDatabaseEvent but got %s. All events: %s", + event, events); return new Pair<>(events.get(0).getEventId(), ((CreateDatabaseEvent) event).getDatabase()); } catch (MetastoreNotificationException e) { @@ -2746,11 +2781,8 @@ public class CatalogOpExecutor { dbName, /* deleteData */true, /* ignoreIfUnknown */false, params.cascade); catalogTimeline.markEvent("Dropped database in Metastore"); - List<NotificationEvent> events = getNextMetastoreEventsIfEnabled( - catalogTimeline, eventId, - event -> dbName.equalsIgnoreCase(event.getDbName()) && ( - DropDatabaseEvent.DROP_DATABASE_EVENT_TYPE.equals(event.getEventType()) || - DropTableEvent.DROP_TABLE_EVENT_TYPE.equals(event.getEventType()))); + List<NotificationEvent> events = getNextMetastoreDropEventsForDbIfEnabled( + catalogTimeline, eventId, dbName); addToDeleteEventLog(events); addSummary(resp, "Database has been dropped."); } catch (TException e) { @@ -2801,12 +2833,12 @@ public class CatalogOpExecutor { for (NotificationEvent event : events) { String eventType = event.getEventType(); Preconditions.checkState( - eventType.equals(DropDatabaseEvent.DROP_DATABASE_EVENT_TYPE) || - eventType.equals(DropTableEvent.DROP_TABLE_EVENT_TYPE) || + eventType.equals(DropDatabaseEvent.EVENT_TYPE) || + eventType.equals(DropTableEvent.EVENT_TYPE) || eventType.equals(DropPartitionEvent.EVENT_TYPE), "Can not add event type: " + "%s to deleteEventLog", eventType); String key; - if (DropDatabaseEvent.DROP_DATABASE_EVENT_TYPE.equals(event.getEventType())) { + if (DropDatabaseEvent.EVENT_TYPE.equals(event.getEventType())) { key = DeleteEventLog.getDbKey(event.getDbName()); } else { Preconditions.checkNotNull(event.getTableName()); @@ -3073,12 +3105,9 @@ public class CatalogOpExecutor { String.format(HMS_RPC_ERROR_FORMAT_STR, "dropTable"), e); } } - List<NotificationEvent> events; - final org.apache.hadoop.hive.metastore.api.Table finalMsTbl = msTbl; - events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId, - event -> DropTableEvent.DROP_TABLE_EVENT_TYPE.equals(event.getEventType()) - && finalMsTbl.getDbName().equalsIgnoreCase(event.getDbName()) - && finalMsTbl.getTableName().equalsIgnoreCase(event.getTableName())); + List<NotificationEvent> events = getNextMetastoreEventsForTableIfEnabled( + catalogTimeline, eventId, tableName.getDb(), tableName.getTbl(), + DropTableEvent.EVENT_TYPE); addSummary(resp, (params.is_table ? "Table " : "View ") + "has been dropped."); addToDeleteEventLog(events); Table table = catalog_.removeTable(params.getTable_name().db_name, @@ -3714,12 +3743,8 @@ public class CatalogOpExecutor { addSummary(response, "Table already exists."); return false; } - events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId, - event -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE - .equals(event.getEventType()) - && newTable.getDbName().equalsIgnoreCase(event.getDbName()) - && newTable.getTableName() - .equalsIgnoreCase(event.getTableName())); + events = getNextMetastoreEventsForTableIfEnabled(catalogTimeline, eventId, + newTable.getDbName(), newTable.getTableName(), CreateTableEvent.EVENT_TYPE); } } // in case of synchronized tables it is possible that Kudu doesn't generate @@ -3798,15 +3823,9 @@ public class CatalogOpExecutor { } catalogTimeline.markEvent(CREATED_HMS_TABLE); addSummary(response, "Table has been created."); - final org.apache.hadoop.hive.metastore.api.Table finalNewTable = newTable; - List<NotificationEvent> events = getNextMetastoreEventsIfEnabled( - catalogTimeline, eventId, - notificationEvent -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE - .equals(notificationEvent.getEventType()) - && finalNewTable.getDbName() - .equalsIgnoreCase(notificationEvent.getDbName()) - && finalNewTable.getTableName() - .equalsIgnoreCase(notificationEvent.getTableName())); + List<NotificationEvent> events = getNextMetastoreEventsForTableIfEnabled( + catalogTimeline, eventId, newTable.getDbName(), newTable.getTableName(), + CreateTableEvent.EVENT_TYPE); eventIdTblPair = getTableFromEvents(events, if_not_exists); if (eventIdTblPair == null) { // TODO (HIVE-21807): Creating a table and retrieving the table information is @@ -4021,10 +4040,8 @@ public class CatalogOpExecutor { msClient.getHiveClient().createTable(newTable); catalogTimeline.markEvent(CREATED_HMS_TABLE); } - events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId, event -> - CreateTableEvent.CREATE_TABLE_EVENT_TYPE.equals(event.getEventType()) - && newTable.getDbName().equalsIgnoreCase(event.getDbName()) - && newTable.getTableName().equalsIgnoreCase(event.getTableName())); + events = getNextMetastoreEventsForTableIfEnabled(catalogTimeline, eventId, + newTable.getDbName(), newTable.getTableName(), CreateTableEvent.EVENT_TYPE); } else { addSummary(response, "Table already exists."); return false; @@ -5126,12 +5143,9 @@ public class CatalogOpExecutor { LOG.info("Added {}/{} partitions in HMS for table {}", numDone, allHmsPartitionsToAdd.size(), tbl.getFullName()); org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable(); - List<NotificationEvent> events = getNextMetastoreEventsIfEnabled( - catalogTimeline, eventId, - event -> AddPartitionEvent.ADD_PARTITION_EVENT_TYPE - .equals(event.getEventType()) - && msTbl.getDbName().equalsIgnoreCase(event.getDbName()) - && msTbl.getTableName().equalsIgnoreCase(event.getTableName())); + List<NotificationEvent> events = getNextMetastoreEventsForTableIfEnabled( + catalogTimeline, eventId, msTbl.getDbName(), msTbl.getTableName(), + AddPartitionEvent.EVENT_TYPE); Map<Partition, Long> partitionToEventSubMap = Maps.newHashMap(); getPartitionsFromEvent(events, partitionToEventSubMap); // set the eventId to last one which we received so the we fetch the next @@ -5351,13 +5365,9 @@ public class CatalogOpExecutor { } catalogTimeline.markEvent("Dropped partitions in Metastore"); } - List<NotificationEvent> events = getNextMetastoreEventsIfEnabled( - catalogTimeline, currentEventId, - notificationEvent -> - tableName.getDb().equalsIgnoreCase(notificationEvent.getDbName()) - && tableName.getTbl().equalsIgnoreCase(notificationEvent.getTableName()) - && DropPartitionEvent.EVENT_TYPE - .equals(notificationEvent.getEventType())); + List<NotificationEvent> events = getNextMetastoreEventsForTableIfEnabled( + catalogTimeline, currentEventId, tableName.getDb(), tableName.getTbl(), + DropPartitionEvent.EVENT_TYPE); addDroppedPartitionsFromEvent( ((HdfsTable) tbl).getClusteringColNames(), events, droppedPartsFromEvent); } catch (TException e) { @@ -5462,11 +5472,9 @@ public class CatalogOpExecutor { } } List<NotificationEvent> events = null; - events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId, - event -> "ALTER_TABLE".equals(event.getEventType()) - // the alter table event is generated on the renamed table - && msTbl.getDbName().equalsIgnoreCase(event.getDbName()) - && msTbl.getTableName().equalsIgnoreCase(event.getTableName())); + // the alter table event is generated on the renamed table + events = getNextMetastoreEventsForTableIfEnabled(catalogTimeline, eventId, + msTbl.getDbName(), msTbl.getTableName(), AlterTableEvent.EVENT_TYPE); Pair<Long, Pair<org.apache.hadoop.hive.metastore.api.Table, org.apache.hadoop.hive.metastore.api.Table>> renamedTable = getRenamedTableFromEvents(events); diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index 81461e633..69b8a084d 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -57,14 +57,10 @@ import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; -import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.io.IOUtils; import org.apache.impala.analysis.FunctionName; import org.apache.impala.analysis.HdfsUri; @@ -323,16 +319,16 @@ public class MetastoreEventsProcessorTest { createTable("testNextMetastoreEvents2", false); List<NotificationEvent> events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches( - eventsProcessor_.catalog_, currentEventId, null, 2); + eventsProcessor_.catalog_, currentEventId, /*filter*/null, 2); assertEquals(3, events.size()); events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches( - eventsProcessor_.catalog_, currentEventId+1, null, 10); + eventsProcessor_.catalog_, currentEventId+1, /*filter*/null, 10); assertEquals(2, events.size()); events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches( - eventsProcessor_.catalog_, currentEventId, null, 3); + eventsProcessor_.catalog_, currentEventId, /*filter*/null, 3); assertEquals(3, events.size()); events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches( - eventsProcessor_.catalog_, currentEventId+3, null, 3); + eventsProcessor_.catalog_, currentEventId+3, /*filter*/null, 3); assertEquals(0, events.size()); } @@ -3639,7 +3635,6 @@ public class MetastoreEventsProcessorTest { /** * Test fetching events in batch when last occurred event is open transaction - * @throws Exception */ @Test public void testFetchEventsInBatchWithOpenTxnAsLastEvent() throws Exception { @@ -3649,7 +3644,7 @@ public class MetastoreEventsProcessorTest { assertEquals(currentEventId + 1, eventsProcessor_.getCurrentEventId()); List<NotificationEvent> events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches( - eventsProcessor_.catalog_, currentEventId, null); + eventsProcessor_.catalog_, currentEventId, null, null); // Open transaction event is not returned from metastore assertEquals(0, events.size()); MetastoreShim.commitTransaction(client.getHiveClient(), txnId); @@ -3657,6 +3652,62 @@ public class MetastoreEventsProcessorTest { } } + @Test + public void testNotFetchingUnwantedEvents() throws Exception { + String tblName = "test_event_skip_list"; + createDatabase(TEST_DB_NAME, null); + Map<String, String> params = new HashMap<>(); + params.put("EXTERNAL", "true"); + params.put("external.table.purge", "true"); + createTable(null, TEST_DB_NAME, tblName, params, true, "EXTERNAL_TABLE"); + eventsProcessor_.processEvents(); + + Table tbl = catalog_.getTable(TEST_DB_NAME, tblName); + assertTrue("tbl should be unloaded", tbl instanceof IncompleteTable); + long createEventId = tbl.getCreateEventId(); + assertEquals(eventsProcessor_.getLatestEventId(), createEventId); + + // create 2 partitions and update partition stats using Hive + try (HiveJdbcClientPool jdbcClientPool = HiveJdbcClientPool.create(1); + HiveJdbcClientPool.HiveJdbcClient hiveClient = jdbcClientPool.getClient()) { + hiveClient.executeSql("set hive.exec.dynamic.partition.mode=nonstrict"); + hiveClient.executeSql(String.format( + "insert into %s.%s partition(p1) values (0,0,0),(1,1,1)", + TEST_DB_NAME, tblName)); + hiveClient.executeSql(String.format( + "analyze table %s.%s compute statistics for columns", TEST_DB_NAME, tblName)); + } + // All the new events are: + // OPEN_TXN + // ADD_PARTITION one for adding 2 partitions + // COMMIT_TXN + // OPEN_TXN + // ALTER_PARTITION + // ALTER_PARTITION + // UPDATE_PART_COL_STAT_EVENT + // UPDATE_PART_COL_STAT_EVENT + // COMMIT_TXN + // Fetch all events with the default skip list. + List<NotificationEvent> events = + MetastoreEventsProcessor.getNextMetastoreEventsInBatches( + eventsProcessor_.catalog_, createEventId, null); + assertEquals(5, events.size()); + + // Fetch events with a null filter but specifying the wanted event type. + events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches( + eventsProcessor_.catalog_, createEventId, /*filter*/null, + MetastoreEvents.CreateTableEvent.EVENT_TYPE); + assertEquals(0, events.size()); + events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches( + eventsProcessor_.catalog_, createEventId, /*filter*/null, + MetastoreEvents.AddPartitionEvent.EVENT_TYPE); + assertEquals(1, events.size()); + events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches( + eventsProcessor_.catalog_, createEventId, /*filter*/null, + AlterPartitionEvent.EVENT_TYPE); + assertEquals(2, events.size()); + } + /** * Test getCurrentEventId() throws MetastoreNotificationFetchException when there are * connection issues with HMS. @@ -3702,7 +3753,7 @@ public class MetastoreEventsProcessorTest { try { MetaStoreClientPool badPool = new IncompetentMetastoreClientPool(0, 0); catalog_.setMetaStoreClientPool(badPool); - MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_, 0, null); + MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_, 0, null, null); } finally { catalog_.setMetaStoreClientPool(origPool); }
