This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit db09d58ef767b2b759792412efcc9481777c464b
Author: stiga-huang <[email protected]>
AuthorDate: Fri Mar 22 21:23:34 2024 +0800

    IMPALA-12933: Avoid fetching unneccessary events of unwanted types
    
    There are several places where catalogd will fetch all events of a
    specific type on a table. E.g. in TableLoader#load(), if the table has
    an old createEventId, catalogd will fetch all CREATE_TABLE events after
    that createEventId on the table.
    
    Fetching the list of events is expensive since the filtering is done on
    client side, i.e. catalogd fetches all events and filter them locally
    based on the event type and table name. This could take hours if there
    are lots of events (e.g 1M) in HMS.
    
    This patch sets the eventTypeSkipList with the complement set of the
    wanted type. So the get_next_notification RPC can filter out some events
    on HMS side. To avoid bringing too much computation overhead to HMS's
    underlying RDBMS in evaluating predicates of EVENT_TYPE != 'xxx', rare
    event types (e.g. DROP_ISCHEMA) are not added in the list. A new flag,
    common_hms_event_types, is added to specify the common HMS event types.
    
    Once HIVE-28146 is resolved, we can set the wanted types directly in the
    HMS RPC and this approach can be simplified.
    
    UPDATE_TBL_COL_STAT_EVENT, UPDATE_PART_COL_STAT_EVENT are the most
    common unused events for Impala. They are also added to the default skip
    list. A new flag, default_skipped_hms_event_types, is added to configure
    this list.
    
    This patch also fixes an issue that events of the non-default catalog
    are not filtered out.
    
    In a local perf test, I generated 100K RELOAD events after creating a
    table in Hive. Then use the table in Impala to trigger metadata loading
    on it which will fetch the latest CREATE_TABLE event by polling all
    events after the last known CREATE_TABLE event. Before this patch,
    fetching the events takes 1s779ms. Now it takes only 395.377ms. Note
    that in prod env, the event messages are usually larger, we could have
    a larger speedup.
    
    Tests:
     - Added an FE test
     - Ran CORE tests
    
    Change-Id: Ieabe714328aa2cc605cb62b85ae8aa4bd537dbe9
    Reviewed-on: http://gerrit.cloudera.org:8080/21186
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc                   |  24 ++++
 be/src/util/backend-gflag-util.cc                  |   4 +
 common/thrift/BackendGflags.thrift                 |   4 +
 .../org/apache/impala/compat/MetastoreShim.java    |   4 +-
 .../org/apache/impala/compat/MetastoreShim.java    |   8 +-
 .../impala/catalog/CatalogServiceCatalog.java      |  32 ++++-
 .../impala/catalog/Hive3MetastoreShimBase.java     |  11 +-
 .../org/apache/impala/catalog/TableLoader.java     |   9 +-
 .../impala/catalog/events/MetastoreEvents.java     |  16 +--
 .../catalog/events/MetastoreEventsProcessor.java   |  95 ++++++++++++---
 .../metastore/CatalogMetastoreServiceHandler.java  |  28 ++---
 .../catalog/metastore/MetastoreServiceHandler.java |  20 +--
 .../org/apache/impala/service/BackendConfig.java   |   8 ++
 .../apache/impala/service/CatalogOpExecutor.java   | 134 +++++++++++----------
 .../events/MetastoreEventsProcessorTest.java       |  73 +++++++++--
 15 files changed, 319 insertions(+), 151 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 9f4d2213e..5f318278a 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -190,6 +190,30 @@ DEFINE_double_hidden(inject_process_event_failure_ratio, 
1.0,
     "event failure. If the generated random number is lesser than this value, 
then we"
     "fail the event processor(EP).");
 
+DEFINE_string(default_skipped_hms_event_types,
+    "OPEN_TXN,UPDATE_TBL_COL_STAT_EVENT,UPDATE_PART_COL_STAT_EVENT",
+    "HMS event types that are not used by Impala. They are skipped by default 
in "
+    "fetching HMS event batches. Only in few places they will be fetched, e.g. 
fetching "
+    "the latest event time in HMS.");
+DEFINE_string(common_hms_event_types, 
"ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,"
+    
"ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,CREATE_TABLE,ALTER_TABLE,DROP_TABLE,"
+    
"CREATE_DATABASE,ALTER_DATABASE,DROP_DATABASE,INSERT,OPEN_TXN,COMMIT_TXN,ABORT_TXN,"
+    "ALLOC_WRITE_ID_EVENT,ACID_WRITE_EVENT,BATCH_ACID_WRITE_EVENT,"
+    
"UPDATE_TBL_COL_STAT_EVENT,DELETE_TBL_COL_STAT_EVENT,UPDATE_PART_COL_STAT_EVENT,"
+    
"UPDATE_PART_COL_STAT_EVENT_BATCH,DELETE_PART_COL_STAT_EVENT,COMMIT_COMPACTION_EVENT,"
+    "RELOAD",
+    "Common HMS event types that will be used in eventTypeSkipList when 
fetching events "
+    "from HMS. The strings come from constants in "
+    "org.apache.hadoop.hive.metastore.messaging.MessageBuilder. When bumping 
Hive "
+    "versions, the list might need to be updated accordingly. To avoid 
bringing too much "
+    "computation overhead to HMS's underlying RDBMS in evaluating predicates 
of "
+    "EVENT_TYPE != 'xxx', rare event types are not tracked in this list. They 
are "
+    
"CREATE_FUNCTION,DROP_FUNCTION,ADD_PRIMARYKEY,ADD_FOREIGNKEY,ADD_UNIQUECONSTRAINT,"
+    "ADD_NOTNULLCONSTRAINT, ADD_DEFAULTCONSTRAINT, ADD_CHECKCONSTRAINT, 
DROP_CONSTRAINT,"
+    "CREATE_ISCHEMA, ALTER_ISCHEMA, DROP_ISCHEMA, ADD_SCHEMA_VERSION,"
+    "ALTER_SCHEMA_VERSION, DROP_SCHEMA_VERSION, CREATE_CATALOG, ALTER_CATALOG,"
+    "DROP_CATALOG, CREATE_DATACONNECTOR, ALTER_DATACONNECTOR, 
DROP_DATACONNECTOR.");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_port);
 DECLARE_string(state_store_2_host);
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index 6ae72e671..18bd639a8 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -123,6 +123,8 @@ DECLARE_string(inject_process_event_failure_event_types);
 DECLARE_double(inject_process_event_failure_ratio);
 DECLARE_bool(enable_workload_mgmt);
 DECLARE_string(query_log_table_name);
+DECLARE_string(default_skipped_hms_event_types);
+DECLARE_string(common_hms_event_types);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -476,6 +478,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_enable_workload_mgmt(FLAGS_enable_workload_mgmt);
   cfg.__set_query_log_table_name(FLAGS_query_log_table_name);
   cfg.__set_query_cpu_root_factor(FLAGS_query_cpu_root_factor);
+  
cfg.__set_default_skipped_hms_event_types(FLAGS_default_skipped_hms_event_types);
+  cfg.__set_common_hms_event_types(FLAGS_common_hms_event_types);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index e0c89b3bd..285316a3c 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -294,4 +294,8 @@ struct TBackendGflags {
   131: required string query_log_table_name
 
   132: required double query_cpu_root_factor
+
+  133: required string default_skipped_hms_event_types
+
+  134: required string common_hms_event_types
 }
diff --git 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 5137beee9..c34d2d8af 100644
--- 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -461,12 +461,12 @@ public class MetastoreShim extends Hive3MetastoreShimBase 
{
    * @see 
org.apache.hadoop.hive.metastore.HiveMetaStoreClient#getNextNotification
    * @param msClient Metastore client
    * @param eventRequest Notification event request
-   * @param isSkipUnwantedEventTypes Whether to set skip event types in request
+   * @param eventTypeSkipList unused
    * @return NotificationEventResponse
    * @throws TException
    */
   public static NotificationEventResponse getNextNotification(IMetaStoreClient 
msClient,
-      NotificationEventRequest eventRequest, boolean isSkipUnwantedEventTypes)
+      NotificationEventRequest eventRequest, List<String> eventTypeSkipList)
       throws TException {
     return getThriftClient(msClient).get_next_notification(eventRequest);
   }
diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 9ecad6284..f5354c0a4 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -597,15 +597,15 @@ public class MetastoreShim extends Hive3MetastoreShimBase 
{
    *
    * @param msClient Metastore client
    * @param eventRequest Notification event request
-   * @param isSkipUnwantedEventTypes Whether to set skip event types in request
+   * @param eventTypeSkipList Unwanted event types
    * @return NotificationEventResponse
    * @throws TException
    */
   public static NotificationEventResponse getNextNotification(IMetaStoreClient 
msClient,
-      NotificationEventRequest eventRequest, boolean isSkipUnwantedEventTypes)
+      NotificationEventRequest eventRequest, List<String> eventTypeSkipList)
       throws TException {
-    if (isSkipUnwantedEventTypes) {
-      
eventRequest.setEventTypeSkipList(MetastoreEventsProcessor.getEventSkipList());
+    if (eventTypeSkipList != null) {
+      eventRequest.setEventTypeSkipList(eventTypeSkipList);
     }
     return msClient.getThriftClient().get_next_notification(eventRequest);
   }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 689bb233b..87a7d59f8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -349,6 +349,16 @@ public class CatalogServiceCatalog extends Catalog {
   // A variable to test expected failed events
   private final Set<String> failureEventsForTesting_;
 
+  // List of event types to skip by default while fetching notification events 
from
+  // metastore.
+  private final List<String> defaultSkippedHmsEventTypes_;
+
+  // List of common known HMS event types. Ignores those that are rare in 
regular (e.g.
+  // daily) jobs. The list is used to generate a complement set for wanted HMS 
event
+  // types. We need this list until HIVE-28146 is resolved. After that we can 
directly
+  // specify what event types we want.
+  private final List<String> commonHmsEventTypes_;
+
   // Total number of dbs, tables and functions in the catalog cache.
   // Updated in each catalog topic update (getCatalogDelta()).
   private int numDbs_ = 0;
@@ -415,6 +425,21 @@ public class CatalogServiceCatalog extends Catalog {
         
Splitter.on(',').trimResults().omitEmptyStrings().split(failureEvents)) {
       failureEventsForTesting_.add(tblProps);
     }
+    defaultSkippedHmsEventTypes_ = Lists.newArrayList();
+    Iterable<String> eventTypes = Splitter.on(',')
+        .trimResults().omitEmptyStrings()
+        .split(BackendConfig.INSTANCE.getDefaultSkippedHmsEventTypes());
+    for (String eventType : eventTypes) {
+      defaultSkippedHmsEventTypes_.add(eventType);
+    }
+    LOG.info("Default skipped HMS event types: " + 
defaultSkippedHmsEventTypes_);
+    commonHmsEventTypes_ = Lists.newArrayList();
+    eventTypes = Splitter.on(',').trimResults().omitEmptyStrings()
+        .split(BackendConfig.INSTANCE.getCommonHmsEventTypes());
+    for (String eventType : eventTypes) {
+      commonHmsEventTypes_.add(eventType);
+    }
+    LOG.info("Common HMS event types: " + commonHmsEventTypes_);
   }
 
   public void startEventsProcessor() {
@@ -4104,7 +4129,6 @@ public class CatalogServiceCatalog extends Catalog {
     this.metastoreEventProcessor_ = metastoreEventProcessor;
   }
 
-  @VisibleForTesting
   public void setCatalogMetastoreServer(ICatalogMetastoreServer 
catalogMetastoreServer) {
     this.catalogMetastoreServer_ = catalogMetastoreServer;
   }
@@ -4141,4 +4165,10 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   public Set<String> getFailureEventsForTesting() { return 
failureEventsForTesting_; }
+
+  public List<String> getDefaultSkippedHmsEventTypes() {
+    return defaultSkippedHmsEventTypes_;
+  }
+
+  public List<String> getCommonHmsEventTypes() { return commonHmsEventTypes_; }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java 
b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
index cdd72aad7..9fb6b7f37 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
@@ -298,14 +298,21 @@ public class Hive3MetastoreShimBase {
 
   // hive-3 introduces a catalog object in hive
   // Impala only supports the default catalog of hive
-  private static final String defaultCatalogName_ = MetaStoreUtils
+  private static final String DEFAULT_CATALOG_NAME = MetaStoreUtils
       .getDefaultCatalog(MetastoreConf.newMetastoreConf());
 
   /**
    * Gets the name of the default catalog from metastore configuration.
    */
   public static String getDefaultCatalogName() {
-    return defaultCatalogName_;
+    return DEFAULT_CATALOG_NAME;
+  }
+
+  /**
+   * Returns whether the catalog name is the default catalog.
+   */
+  public static boolean isDefaultCatalog(String catalogName) {
+    return DEFAULT_CATALOG_NAME.equalsIgnoreCase(catalogName);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java 
b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
index 79b388cb5..a30313f15 100644
--- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java
@@ -24,8 +24,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.impala.catalog.events.EventFactory;
-import org.apache.impala.catalog.events.MetastoreEvents;
 import org.apache.impala.catalog.events.MetastoreEvents.CreateTableEvent;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
 import org.apache.impala.common.Metrics;
@@ -101,11 +99,8 @@ public class TableLoader {
         // we are only interested in fetching the events if we have a valid 
eventId
         // for a table. For tables where eventId is unknown are not created by
         // this catalogd and hence the self-event detection logic does not 
apply.
-        events = 
MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_,
-            eventId, notificationEvent -> 
CreateTableEvent.CREATE_TABLE_EVENT_TYPE
-                .equals(notificationEvent.getEventType())
-                && notificationEvent.getDbName().equalsIgnoreCase(db.getName())
-                && notificationEvent.getTableName().equalsIgnoreCase(tblName));
+        events = 
MetastoreEventsProcessor.getNextMetastoreEventsInBatchesForTable(
+            catalog_, eventId, db.getName(), tblName, 
CreateTableEvent.EVENT_TYPE);
         catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH);
       }
       if (events != null && !events.isEmpty()) {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index bf861957f..726455923 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -83,7 +83,6 @@ import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.DebugUtils;
-import org.apache.impala.util.EventSequence;
 import org.apache.impala.util.MetaStoreUtil;
 import org.apache.impala.util.NoOpEventSequence;
 import org.slf4j.LoggerFactory;
@@ -279,8 +278,7 @@ public class MetastoreEvents {
         String catalogName = currentEvent.getCatalogName();
         String eventDb = currentEvent.getDbName();
         String eventTbl = currentEvent.getTableName();
-        if (catalogName != null && !MetastoreShim.getDefaultCatalogName()
-            .equalsIgnoreCase(catalogName)) {
+        if (catalogName != null && 
!MetastoreShim.isDefaultCatalog(catalogName)) {
           // currently Impala doesn't support custom hive catalogs and hence 
we should
           // ignore all the events which are on non-default catalog namespaces.
           LOG.debug(currentEvent.debugString(
@@ -1405,7 +1403,7 @@ public class MetastoreEvents {
    */
   public static class CreateTableEvent extends MetastoreTableEvent {
 
-    public static final String CREATE_TABLE_EVENT_TYPE = "CREATE_TABLE";
+    public static final String EVENT_TYPE = "CREATE_TABLE";
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory 
instead
      */
@@ -1662,6 +1660,7 @@ public class MetastoreEvents {
    * MetastoreEvent for ALTER_TABLE event type
    */
   public static class AlterTableEvent extends MetastoreTableEvent {
+    public static final String EVENT_TYPE = "ALTER_TABLE";
     protected org.apache.hadoop.hive.metastore.api.Table tableBefore_;
     // the table object after alter operation, as parsed from the 
NotificationEvent
     protected org.apache.hadoop.hive.metastore.api.Table tableAfter_;
@@ -2074,7 +2073,7 @@ public class MetastoreEvents {
    */
   public static class DropTableEvent extends MetastoreTableEvent {
 
-    public static final String DROP_TABLE_EVENT_TYPE = "DROP_TABLE";
+    public static final String EVENT_TYPE = "DROP_TABLE";
 
     /**
      * Prevent instantiation from outside should use MetastoreEventFactory 
instead
@@ -2145,7 +2144,7 @@ public class MetastoreEvents {
    */
   public static class CreateDatabaseEvent extends MetastoreDatabaseEvent {
 
-    public static final String CREATE_DATABASE_EVENT_TYPE = "CREATE_DATABASE";
+    public static final String EVENT_TYPE = "CREATE_DATABASE";
     // metastore database object as parsed from NotificationEvent message
     private final Database createdDatabase_;
 
@@ -2294,7 +2293,7 @@ public class MetastoreEvents {
    */
   public static class DropDatabaseEvent extends MetastoreDatabaseEvent {
 
-    public static final String DROP_DATABASE_EVENT_TYPE = "DROP_DATABASE";
+    public static final String EVENT_TYPE = "DROP_DATABASE";
     // Metastore database object as parsed from NotificationEvent message
     private final Database droppedDatabase_;
     /**
@@ -2395,7 +2394,7 @@ public class MetastoreEvents {
   }
   public static class AddPartitionEvent extends MetastoreTableEvent {
 
-    public static final String ADD_PARTITION_EVENT_TYPE = "ADD_PARTITION";
+    public static final String EVENT_TYPE = "ADD_PARTITION";
     private final List<Partition> addedPartitions_;
     private final List<List<TPartitionKeyValue>> partitionKeyVals_;
 
@@ -2501,6 +2500,7 @@ public class MetastoreEvents {
   }
 
   public static class AlterPartitionEvent extends MetastoreTableEvent {
+    public static final String EVENT_TYPE = "ALTER_PARTITION";
     // the Partition object before alter operation, as parsed from the 
NotificationEvent
     private final org.apache.hadoop.hive.metastore.api.Partition 
partitionBefore_;
     // the Partition object after alter operation, as parsed from the 
NotificationEvent
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
index e6a683a3c..8dabf8be2 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEventsProcessor.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
@@ -54,6 +55,7 @@ import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.IncompleteTable;
 import org.apache.impala.catalog.events.ConfigValidator.ValidationResult;
 import org.apache.impala.catalog.events.MetastoreEvents.DropDatabaseEvent;
+import org.apache.impala.catalog.events.MetastoreEvents.DropTableEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
 import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
 import org.apache.impala.common.Metrics;
@@ -277,19 +279,54 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
 
   private static final long SECOND_IN_NANOS = 1000 * 1000 * 1000L;
 
-  // List of event types to skip while fetching notification events from 
metastore
-  private static final List<String> EVENT_SKIP_LIST = 
Arrays.asList("OPEN_TXN");
+  public static List<NotificationEvent> getNextMetastoreEventsInBatchesForDb(
+      CatalogServiceCatalog catalog, long eventId, String dbName, String 
eventType)
+      throws MetastoreNotificationException {
+    return getNextMetastoreEventsInBatchesForDb(catalog, eventId,
+        MetastoreShim.getDefaultCatalogName(), dbName, eventType);
+  }
+
+  public static List<NotificationEvent> getNextMetastoreEventsInBatchesForDb(
+      CatalogServiceCatalog catalog, long eventId, String catName,
+      String dbName, String eventType) throws MetastoreNotificationException {
+    Preconditions.checkNotNull(eventType, "eventType is null in fetching db 
events");
+    Preconditions.checkNotNull(catName, "catName is null in fetching db 
events");
+    Preconditions.checkNotNull(dbName, "dbName is null in fetching db events");
+    NotificationFilter filter = notificationEvent ->
+        eventType.equals(notificationEvent.getEventType())
+            && catName.equalsIgnoreCase(notificationEvent.getCatName())
+            && dbName.equalsIgnoreCase(notificationEvent.getDbName());
+    return getNextMetastoreEventsInBatches(catalog, eventId, filter, 
eventType);
+  }
+
+  public static List<NotificationEvent> 
getNextMetastoreEventsInBatchesForTable(
+      CatalogServiceCatalog catalog, long eventId, String dbName, String 
tblName,
+      String eventType) throws MetastoreNotificationException {
+    return getNextMetastoreEventsInBatchesForTable(catalog, eventId,
+        MetastoreShim.getDefaultCatalogName(), dbName, tblName, eventType);
+  }
+
+  public static List<NotificationEvent> 
getNextMetastoreEventsInBatchesForTable(
+      CatalogServiceCatalog catalog, long eventId, String catName, String 
dbName,
+      String tblName, String eventType) throws MetastoreNotificationException {
+    Preconditions.checkNotNull(eventType, "eventType is null in fetching table 
events");
+    Preconditions.checkNotNull(catName, "catName is null in fetching table 
events");
+    Preconditions.checkNotNull(dbName, "dbName is null in fetching table 
events");
+    Preconditions.checkNotNull(tblName, "tblName is null in fetching table 
events");
+    NotificationFilter filter = notificationEvent ->
+        eventType.equals(notificationEvent.getEventType())
+            && catName.equalsIgnoreCase(notificationEvent.getCatName())
+            && dbName.equalsIgnoreCase(notificationEvent.getDbName())
+            && tblName.equalsIgnoreCase(notificationEvent.getTableName());
+    return getNextMetastoreEventsInBatches(catalog, eventId, filter,
+        EVENTS_BATCH_SIZE_PER_RPC, eventType);
+  }
 
-  /**
-   * Wrapper around {@link
-   * 
MetastoreEventsProcessor#getNextMetastoreEventsInBatches(CatalogServiceCatalog,
-   * long, NotificationFilter, int)} which passes the default batch size.
-   */
   public static List<NotificationEvent> getNextMetastoreEventsInBatches(
-      CatalogServiceCatalog catalog, long eventId, NotificationFilter filter)
-      throws MetastoreNotificationFetchException {
+      CatalogServiceCatalog catalog, long eventId, NotificationFilter filter,
+      String... eventTypes) throws MetastoreNotificationFetchException {
     return getNextMetastoreEventsInBatches(catalog, eventId, filter,
-        EVENTS_BATCH_SIZE_PER_RPC);
+        EVENTS_BATCH_SIZE_PER_RPC, eventTypes);
   }
 
   /**
@@ -309,7 +346,8 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   @VisibleForTesting
   public static List<NotificationEvent> getNextMetastoreEventsInBatches(
       CatalogServiceCatalog catalog, long eventId, NotificationFilter filter,
-      int eventsBatchSize) throws MetastoreNotificationFetchException {
+      int eventsBatchSize, String... eventTypes)
+      throws MetastoreNotificationFetchException {
     Preconditions.checkArgument(eventsBatchSize > 0);
     List<NotificationEvent> result = new ArrayList<>();
     try (MetaStoreClient msc = catalog.getMetaStoreClient()) {
@@ -317,6 +355,17 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
           .getEventId();
       if (toEventId <= eventId) return result;
       long currentEventId = eventId;
+      List<String> eventTypeSkipList = 
catalog.getDefaultSkippedHmsEventTypes();
+      String typeStr = null;
+      if (eventTypes != null && eventTypes.length > 0) {
+        eventTypeSkipList = 
Lists.newArrayList(catalog.getCommonHmsEventTypes());
+        eventTypeSkipList.removeIf(s -> Arrays.asList(eventTypes).contains(s));
+        typeStr = String.join(",", eventTypes) + " ";
+      }
+      LOG.info("Fetching {}events started from id {} to {}. Gap: {}",
+          typeStr == null ? "" : typeStr, eventId, toEventId,
+          toEventId - eventId);
+      int numFilteredEvents = 0;
       while (currentEventId < toEventId) {
         int batchSize = Math
             .min(eventsBatchSize, (int)(toEventId - currentEventId));
@@ -328,17 +377,28 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
         eventRequest.setMaxEvents(batchSize);
         eventRequest.setLastEvent(currentEventId);
         NotificationEventResponse notificationEventResponse =
-            MetastoreShim.getNextNotification(msc.getHiveClient(), 
eventRequest, true);
+            MetastoreShim.getNextNotification(msc.getHiveClient(), 
eventRequest,
+                eventTypeSkipList);
         if (notificationEventResponse.getEvents().isEmpty()) {
           // Possible to receive empty list due to event skip list in request
-          return result;
+          break;
         }
+
         for (NotificationEvent event : notificationEventResponse.getEvents()) {
           // if no filter is provided we add all the events
-          if (filter == null || filter.accept(event)) result.add(event);
+          if (filter == null || filter.accept(event)) {
+            result.add(event);
+          } else {
+            numFilteredEvents++;
+          }
           currentEventId = event.getEventId();
         }
       }
+      if (numFilteredEvents > 0) {
+        LOG.info("Got {} events and filtered out {} locally from {} events 
start " +
+                "from id {}",
+            result.size(), numFilteredEvents, toEventId - eventId, eventId + 
1);
+      }
       return result;
     } catch (MetastoreClientInstantiationException | TException e) {
       throw new MetastoreNotificationFetchException(String.format(
@@ -785,7 +845,7 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
     eventRequest.setMaxEvents(1);
     try {
       NotificationEventResponse response = MetastoreShim.getNextNotification(
-          msClient.getHiveClient(), eventRequest, false);
+          msClient.getHiveClient(), eventRequest, null);
       Iterator<NotificationEvent> eventIter = response.getEventsIterator();
       if (!eventIter.hasNext()) {
         LOG.warn("Unable to fetch event {}. It has been cleaned up", eventId);
@@ -917,7 +977,8 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
       eventRequest.setLastEvent(eventId);
       eventRequest.setMaxEvents(batchSize);
       NotificationEventResponse response =
-          MetastoreShim.getNextNotification(msClient.getHiveClient(), 
eventRequest, true);
+          MetastoreShim.getNextNotification(msClient.getHiveClient(), 
eventRequest,
+              catalog_.getDefaultSkippedHmsEventTypes());
       LOG.info(String.format("Received %d events. Start event id : %d",
           response.getEvents().size(), eventId));
       if (filter == null) return response.getEvents();
@@ -1373,6 +1434,4 @@ public class MetastoreEventsProcessor implements 
ExternalEventsProcessor {
   public static MessageDeserializer getMessageDeserializer() {
     return MESSAGE_DESERIALIZER;
   }
-
-  public static List<String> getEventSkipList() { return EVENT_SKIP_LIST; }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
 
b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
index a5b2114c7..8df9673ff 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
@@ -214,13 +214,9 @@ public class CatalogMetastoreServiceHandler extends 
MetastoreServiceHandler {
           throw e;
         }
       }
-      List<NotificationEvent> events =
-          MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_, 
fromEventId,
-              notificationEvent ->
-                  
MetastoreEvents.CreateDatabaseEvent.CREATE_DATABASE_EVENT_TYPE
-                      .equals(notificationEvent.getEventType())
-                      && 
dbName.equalsIgnoreCase(notificationEvent.getDbName()));
-
+      List<NotificationEvent> events = MetastoreEventsProcessor
+          .getNextMetastoreEventsInBatchesForDb(catalog_, fromEventId, dbName,
+              MetastoreEvents.CreateDatabaseEvent.EVENT_TYPE);
       Preconditions.checkArgument(events.size() == 1,
           "Db %s was recreated in metastore " +
               "while the current db creation was in progress", dbName);
@@ -1330,12 +1326,11 @@ public class CatalogMetastoreServiceHandler extends 
MetastoreServiceHandler {
       T resp = task.execute();
       // Rename scenario, remove old table and add new one
       try {
+        // the alter table event is generated on the renamed table
         List<NotificationEvent> events =
-            MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_,
-                currentEventId, event -> 
"ALTER_TABLE".equals(event.getEventType())
-                // the alter table event is generated on the renamed table
-                && newTable.getDbName().equalsIgnoreCase(event.getDbName())
-                && 
newTable.getTableName().equalsIgnoreCase(event.getTableName()));
+            
MetastoreEventsProcessor.getNextMetastoreEventsInBatchesForTable(catalog_,
+                currentEventId, newTable.getDbName(), newTable.getTableName(),
+                MetastoreEvents.AlterTableEvent.EVENT_TYPE);
         Preconditions.checkState(events.size() == 1, String.format("For table 
%s.%s, "
             + "from event id: %s, expected ALTER_TABLE events size to be 1 but 
is %s",
             newTable.getDbName(), newTable.getTableName(), currentEventId,
@@ -1418,13 +1413,8 @@ public class CatalogMetastoreServiceHandler extends 
MetastoreServiceHandler {
             "exception {} from metastore", dbName, tblName, 
e.getClass().getName());
       }
       List<NotificationEvent> events =
-          MetastoreEventsProcessor
-              .getNextMetastoreEventsInBatches(catalog_, fromEventId,
-                  event -> 
MetastoreEvents.CreateTableEvent.CREATE_TABLE_EVENT_TYPE
-                      .equals(event.getEventType())
-                      && dbName.equalsIgnoreCase(event.getDbName())
-                      && tblName.equalsIgnoreCase(event.getTableName()));
-
+          
MetastoreEventsProcessor.getNextMetastoreEventsInBatchesForTable(catalog_,
+              fromEventId, dbName, tblName, 
MetastoreEvents.CreateTableEvent.EVENT_TYPE);
       Preconditions.checkState(events.size() == 1,
           "Table %s.%s was recreated in metastore since event id %s" +
               "while the current table creation was in progress", dbName, 
tblName,
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
 
b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
index 859d71c49..2076457d3 100644
--- 
a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
+++ 
b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
@@ -152,7 +152,6 @@ import 
org.apache.hadoop.hive.metastore.api.InvalidInputException;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.ListPackageRequest;
-import org.apache.hadoop.hive.metastore.api.ListStoredProcedureRequest;
 import org.apache.hadoop.hive.metastore.api.LockRequest;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.MapSchemaVersionToSerdeRequest;
@@ -290,8 +289,6 @@ import 
org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.events.EventFactory;
 import org.apache.impala.catalog.events.MetastoreEvents;
 import org.apache.impala.catalog.events.MetastoreEvents.DropTableEvent;
-import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
-import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
 import org.apache.impala.catalog.events.MetastoreEventsProcessor;
 import org.apache.impala.common.Metrics;
 import org.apache.impala.common.Reference;
@@ -3198,12 +3195,8 @@ public abstract class MetastoreServiceHandler extends 
AbstractThriftHiveMetastor
     dbName = catAndDbName[1];
     try {
       List<NotificationEvent> events = MetastoreEventsProcessor
-          .getNextMetastoreEventsInBatches(catalog_, beforeDropEventId,
-              event -> event.getEventType()
-                  .equalsIgnoreCase(DropTableEvent.DROP_TABLE_EVENT_TYPE)
-                  && catName.equalsIgnoreCase(event.getCatName())
-                  && dbName.equalsIgnoreCase(event.getDbName())
-                  && tableName.equalsIgnoreCase(event.getTableName()));
+          .getNextMetastoreEventsInBatchesForTable(catalog_, beforeDropEventId,
+              catName, dbName, tableName, DropTableEvent.EVENT_TYPE);
       if (events.isEmpty()) {
         throw new MetaException(
             "Drop table event not received. Check if notification events are "
@@ -3294,13 +3287,8 @@ public abstract class MetastoreServiceHandler extends 
AbstractThriftHiveMetastor
 
     try {
       List<NotificationEvent> events = MetastoreEventsProcessor
-          .getNextMetastoreEventsInBatches(catalog_, beforeDropEventId,
-              event -> event.getEventType()
-                  .equalsIgnoreCase(MetastoreEvents.DropDatabaseEvent
-                      .DROP_DATABASE_EVENT_TYPE)
-                  && catName.equalsIgnoreCase(event.getCatName())
-                  && dbName.equalsIgnoreCase(event.getDbName()));
-
+          .getNextMetastoreEventsInBatchesForDb(catalog_, beforeDropEventId, 
catName,
+              dbName, MetastoreEvents.DropDatabaseEvent.EVENT_TYPE);
       if (events.size() == 0) {
         if (ignoreUnknownDb) {
           LOG.debug("db {} does not exist in metastore. Removing it from 
catalog if "
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index ed5a5fb90..2add99a2f 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -498,4 +498,12 @@ public class BackendConfig {
   }
 
   public double getQueryCpuRootFactor() { return 
backendCfg_.query_cpu_root_factor; }
+
+  public String getDefaultSkippedHmsEventTypes() {
+    return backendCfg_.default_skipped_hms_event_types;
+  }
+
+  public String getCommonHmsEventTypes() {
+    return backendCfg_.common_hms_event_types;
+  }
 }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 85722ffe7..05043b99f 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2128,12 +2128,8 @@ public class CatalogOpExecutor {
           long eventId = getCurrentEventId(msClient, catalogTimeline);
           msClient.getHiveClient().createDatabase(db);
           catalogTimeline.markEvent("Created database in Metastore");
-          List<NotificationEvent> events = getNextMetastoreEventsIfEnabled(
-              catalogTimeline, eventId,
-              notificationEvent ->
-                  CreateDatabaseEvent.CREATE_DATABASE_EVENT_TYPE
-                      .equals(notificationEvent.getEventType())
-                      && 
dbName.equalsIgnoreCase(notificationEvent.getDbName()));
+          List<NotificationEvent> events = 
getNextMetastoreEventsForDbIfEnabled(
+              catalogTimeline, eventId, dbName, 
CreateDatabaseEvent.EVENT_TYPE);
           Pair<Long, Database> eventDbPair = getDatabaseFromEvents(events,
               params.if_not_exists);
           if (eventDbPair == null) {
@@ -2196,16 +2192,53 @@ public class CatalogOpExecutor {
 
   /**
    * Wrapper around
-   * {@code MetastoreEventsProcessor#getNextMetastoreEventsInBatches} with the
+   * {@code MetastoreEventsProcessor#getNextMetastoreEventsInBatchesForTable} 
with the
    * addition that it checks if events processing is active or not. If not 
active,
-   * returns an empty list. Also updates the given 'catalogTimeline'
+   * returns an empty list. Also updates the given 'catalogTimeline'.
    */
-  private List<NotificationEvent> getNextMetastoreEventsIfEnabled(
-      EventSequence catalogTimeline, long eventId, NotificationFilter 
eventsFilter)
+  private List<NotificationEvent> getNextMetastoreEventsForTableIfEnabled(
+      EventSequence catalogTimeline, long eventId, String dbName, String 
tblName,
+      String eventType) throws MetastoreNotificationException {
+    if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
+    List<NotificationEvent> events = MetastoreEventsProcessor
+        .getNextMetastoreEventsInBatchesForTable(catalog_, eventId, dbName, 
tblName,
+            eventType);
+    catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH);
+    return events;
+  }
+
+  /**
+   * Wrapper around
+   * {@code MetastoreEventsProcessor#getNextMetastoreEventsInBatchesForDb} 
with the
+   * addition that it checks if events processing is active or not. If not 
active,
+   * returns an empty list. Also updates the given 'catalogTimeline'.
+   */
+  private List<NotificationEvent> getNextMetastoreEventsForDbIfEnabled(
+      EventSequence catalogTimeline, long eventId, String dbName, String 
eventType)
       throws MetastoreNotificationException {
     if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
     List<NotificationEvent> events = MetastoreEventsProcessor
-        .getNextMetastoreEventsInBatches(catalog_, eventId, eventsFilter);
+        .getNextMetastoreEventsInBatchesForDb(catalog_, eventId, dbName, 
eventType);
+    catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH);
+    return events;
+  }
+
+  /**
+   * Fetches CreateDatabase and CreateTable events of a db if events 
processing is active.
+   * Returns an empty list if not active. Also updates the given 
'catalogTimeline'.
+   */
+  private List<NotificationEvent> getNextMetastoreDropEventsForDbIfEnabled(
+      EventSequence catalogTimeline, long eventId, String dbName)
+      throws MetastoreNotificationException {
+    if (!catalog_.isEventProcessingActive()) return Collections.emptyList();
+    List<String> eventTypes = Lists.newArrayList(
+        DropDatabaseEvent.EVENT_TYPE, DropTableEvent.EVENT_TYPE);
+    NotificationFilter filter = e -> dbName.equalsIgnoreCase(e.getDbName())
+        && MetastoreShim.isDefaultCatalog(e.getCatName())
+        && eventTypes.contains(e.getEventType());
+    List<NotificationEvent> events = MetastoreEventsProcessor
+        .getNextMetastoreEventsInBatches(catalog_, eventId, filter,
+            DropDatabaseEvent.EVENT_TYPE, DropTableEvent.EVENT_TYPE);
     catalogTimeline.markEvent(FETCHED_HMS_EVENT_BATCH);
     return events;
   }
@@ -2236,7 +2269,9 @@ public class CatalogOpExecutor {
       MetastoreEvent event = catalog_
           .getMetastoreEventProcessor().getEventsFactory()
           .get(events.get(events.size() - 1), null);
-      Preconditions.checkState(event instanceof CreateDatabaseEvent);
+      Preconditions.checkState(event instanceof CreateDatabaseEvent,
+          "Expects CreateDatabaseEvent but got %s. All events: %s",
+          event, events);
       return new Pair<>(events.get(0).getEventId(),
           ((CreateDatabaseEvent) event).getDatabase());
     } catch (MetastoreNotificationException e) {
@@ -2746,11 +2781,8 @@ public class CatalogOpExecutor {
             dbName, /* deleteData */true, /* ignoreIfUnknown */false,
             params.cascade);
         catalogTimeline.markEvent("Dropped database in Metastore");
-        List<NotificationEvent> events = getNextMetastoreEventsIfEnabled(
-            catalogTimeline, eventId,
-            event -> dbName.equalsIgnoreCase(event.getDbName()) && (
-                
DropDatabaseEvent.DROP_DATABASE_EVENT_TYPE.equals(event.getEventType()) ||
-                    
DropTableEvent.DROP_TABLE_EVENT_TYPE.equals(event.getEventType())));
+        List<NotificationEvent> events = 
getNextMetastoreDropEventsForDbIfEnabled(
+            catalogTimeline, eventId, dbName);
         addToDeleteEventLog(events);
         addSummary(resp, "Database has been dropped.");
       } catch (TException e) {
@@ -2801,12 +2833,12 @@ public class CatalogOpExecutor {
     for (NotificationEvent event : events) {
       String eventType = event.getEventType();
       Preconditions.checkState(
-          eventType.equals(DropDatabaseEvent.DROP_DATABASE_EVENT_TYPE) ||
-          eventType.equals(DropTableEvent.DROP_TABLE_EVENT_TYPE) ||
+          eventType.equals(DropDatabaseEvent.EVENT_TYPE) ||
+          eventType.equals(DropTableEvent.EVENT_TYPE) ||
           eventType.equals(DropPartitionEvent.EVENT_TYPE), "Can not add event 
type: " +
               "%s to deleteEventLog", eventType);
       String key;
-      if 
(DropDatabaseEvent.DROP_DATABASE_EVENT_TYPE.equals(event.getEventType())) {
+      if (DropDatabaseEvent.EVENT_TYPE.equals(event.getEventType())) {
         key = DeleteEventLog.getDbKey(event.getDbName());
       } else {
         Preconditions.checkNotNull(event.getTableName());
@@ -3073,12 +3105,9 @@ public class CatalogOpExecutor {
               String.format(HMS_RPC_ERROR_FORMAT_STR, "dropTable"), e);
         }
       }
-      List<NotificationEvent> events;
-      final org.apache.hadoop.hive.metastore.api.Table finalMsTbl = msTbl;
-      events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId,
-          event -> 
DropTableEvent.DROP_TABLE_EVENT_TYPE.equals(event.getEventType())
-              && finalMsTbl.getDbName().equalsIgnoreCase(event.getDbName())
-              && 
finalMsTbl.getTableName().equalsIgnoreCase(event.getTableName()));
+      List<NotificationEvent> events = getNextMetastoreEventsForTableIfEnabled(
+          catalogTimeline, eventId, tableName.getDb(), tableName.getTbl(),
+          DropTableEvent.EVENT_TYPE);
       addSummary(resp, (params.is_table ? "Table " : "View ") + "has been 
dropped.");
       addToDeleteEventLog(events);
       Table table = catalog_.removeTable(params.getTable_name().db_name,
@@ -3714,12 +3743,8 @@ public class CatalogOpExecutor {
             addSummary(response, "Table already exists.");
             return false;
           }
-          events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId,
-                  event -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE
-                      .equals(event.getEventType())
-                      && 
newTable.getDbName().equalsIgnoreCase(event.getDbName())
-                      && newTable.getTableName()
-                      .equalsIgnoreCase(event.getTableName()));
+          events = getNextMetastoreEventsForTableIfEnabled(catalogTimeline, 
eventId,
+              newTable.getDbName(), newTable.getTableName(), 
CreateTableEvent.EVENT_TYPE);
         }
       }
       // in case of synchronized tables it is possible that Kudu doesn't 
generate
@@ -3798,15 +3823,9 @@ public class CatalogOpExecutor {
         }
         catalogTimeline.markEvent(CREATED_HMS_TABLE);
         addSummary(response, "Table has been created.");
-        final org.apache.hadoop.hive.metastore.api.Table finalNewTable = 
newTable;
-        List<NotificationEvent> events = getNextMetastoreEventsIfEnabled(
-            catalogTimeline, eventId,
-            notificationEvent -> CreateTableEvent.CREATE_TABLE_EVENT_TYPE
-                .equals(notificationEvent.getEventType())
-                && finalNewTable.getDbName()
-                .equalsIgnoreCase(notificationEvent.getDbName())
-                && finalNewTable.getTableName()
-                .equalsIgnoreCase(notificationEvent.getTableName()));
+        List<NotificationEvent> events = 
getNextMetastoreEventsForTableIfEnabled(
+            catalogTimeline, eventId, newTable.getDbName(), 
newTable.getTableName(),
+            CreateTableEvent.EVENT_TYPE);
         eventIdTblPair = getTableFromEvents(events, if_not_exists);
         if (eventIdTblPair == null) {
           // TODO (HIVE-21807): Creating a table and retrieving the table 
information is
@@ -4021,10 +4040,8 @@ public class CatalogOpExecutor {
             msClient.getHiveClient().createTable(newTable);
             catalogTimeline.markEvent(CREATED_HMS_TABLE);
           }
-          events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId, 
event ->
-              
CreateTableEvent.CREATE_TABLE_EVENT_TYPE.equals(event.getEventType())
-                  && newTable.getDbName().equalsIgnoreCase(event.getDbName())
-                  && 
newTable.getTableName().equalsIgnoreCase(event.getTableName()));
+          events = getNextMetastoreEventsForTableIfEnabled(catalogTimeline, 
eventId,
+              newTable.getDbName(), newTable.getTableName(), 
CreateTableEvent.EVENT_TYPE);
         } else {
           addSummary(response, "Table already exists.");
           return false;
@@ -5126,12 +5143,9 @@ public class CatalogOpExecutor {
         LOG.info("Added {}/{} partitions in HMS for table {}", numDone,
             allHmsPartitionsToAdd.size(), tbl.getFullName());
         org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable();
-        List<NotificationEvent> events = getNextMetastoreEventsIfEnabled(
-            catalogTimeline, eventId,
-            event -> AddPartitionEvent.ADD_PARTITION_EVENT_TYPE
-                .equals(event.getEventType())
-                && msTbl.getDbName().equalsIgnoreCase(event.getDbName())
-                && 
msTbl.getTableName().equalsIgnoreCase(event.getTableName()));
+        List<NotificationEvent> events = 
getNextMetastoreEventsForTableIfEnabled(
+            catalogTimeline, eventId, msTbl.getDbName(), msTbl.getTableName(),
+            AddPartitionEvent.EVENT_TYPE);
         Map<Partition, Long> partitionToEventSubMap = Maps.newHashMap();
         getPartitionsFromEvent(events, partitionToEventSubMap);
         // set the eventId to last one which we received so the we fetch the 
next
@@ -5351,13 +5365,9 @@ public class CatalogOpExecutor {
         }
         catalogTimeline.markEvent("Dropped partitions in Metastore");
       }
-      List<NotificationEvent> events = getNextMetastoreEventsIfEnabled(
-          catalogTimeline, currentEventId,
-          notificationEvent ->
-              tableName.getDb().equalsIgnoreCase(notificationEvent.getDbName())
-                  && 
tableName.getTbl().equalsIgnoreCase(notificationEvent.getTableName())
-                  && DropPartitionEvent.EVENT_TYPE
-                  .equals(notificationEvent.getEventType()));
+      List<NotificationEvent> events = getNextMetastoreEventsForTableIfEnabled(
+          catalogTimeline, currentEventId, tableName.getDb(), 
tableName.getTbl(),
+          DropPartitionEvent.EVENT_TYPE);
       addDroppedPartitionsFromEvent(
           ((HdfsTable) tbl).getClusteringColNames(), events, 
droppedPartsFromEvent);
     } catch (TException e) {
@@ -5462,11 +5472,9 @@ public class CatalogOpExecutor {
       }
     }
     List<NotificationEvent> events = null;
-    events = getNextMetastoreEventsIfEnabled(catalogTimeline, eventId,
-        event -> "ALTER_TABLE".equals(event.getEventType())
-            // the alter table event is generated on the renamed table
-            && msTbl.getDbName().equalsIgnoreCase(event.getDbName())
-            && msTbl.getTableName().equalsIgnoreCase(event.getTableName()));
+    // the alter table event is generated on the renamed table
+    events = getNextMetastoreEventsForTableIfEnabled(catalogTimeline, eventId,
+        msTbl.getDbName(), msTbl.getTableName(), AlterTableEvent.EVENT_TYPE);
     Pair<Long, Pair<org.apache.hadoop.hive.metastore.api.Table,
         org.apache.hadoop.hive.metastore.api.Table>> renamedTable =
         getRenamedTableFromEvents(events);
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
index 81461e633..69b8a084d 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java
@@ -57,14 +57,10 @@ import 
org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
-import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.impala.analysis.FunctionName;
 import org.apache.impala.analysis.HdfsUri;
@@ -323,16 +319,16 @@ public class MetastoreEventsProcessorTest {
     createTable("testNextMetastoreEvents2", false);
     List<NotificationEvent> events =
         MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
-        eventsProcessor_.catalog_, currentEventId, null, 2);
+        eventsProcessor_.catalog_, currentEventId, /*filter*/null, 2);
     assertEquals(3, events.size());
     events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
-        eventsProcessor_.catalog_, currentEventId+1, null, 10);
+        eventsProcessor_.catalog_, currentEventId+1, /*filter*/null, 10);
     assertEquals(2, events.size());
     events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
-        eventsProcessor_.catalog_, currentEventId, null, 3);
+        eventsProcessor_.catalog_, currentEventId, /*filter*/null, 3);
     assertEquals(3, events.size());
     events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
-        eventsProcessor_.catalog_, currentEventId+3, null, 3);
+        eventsProcessor_.catalog_, currentEventId+3, /*filter*/null, 3);
     assertEquals(0, events.size());
   }
 
@@ -3639,7 +3635,6 @@ public class MetastoreEventsProcessorTest {
 
   /**
    * Test fetching events in batch when last occurred event is open transaction
-   * @throws Exception
    */
   @Test
   public void testFetchEventsInBatchWithOpenTxnAsLastEvent() throws Exception {
@@ -3649,7 +3644,7 @@ public class MetastoreEventsProcessorTest {
       assertEquals(currentEventId + 1, eventsProcessor_.getCurrentEventId());
       List<NotificationEvent> events =
           MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
-              eventsProcessor_.catalog_, currentEventId, null);
+              eventsProcessor_.catalog_, currentEventId, null, null);
       // Open transaction event is not returned from metastore
       assertEquals(0, events.size());
       MetastoreShim.commitTransaction(client.getHiveClient(), txnId);
@@ -3657,6 +3652,62 @@ public class MetastoreEventsProcessorTest {
     }
   }
 
+  @Test
+  public void testNotFetchingUnwantedEvents() throws Exception {
+    String tblName = "test_event_skip_list";
+    createDatabase(TEST_DB_NAME, null);
+    Map<String, String> params = new HashMap<>();
+    params.put("EXTERNAL", "true");
+    params.put("external.table.purge", "true");
+    createTable(null, TEST_DB_NAME, tblName, params, true, "EXTERNAL_TABLE");
+    eventsProcessor_.processEvents();
+
+    Table tbl = catalog_.getTable(TEST_DB_NAME, tblName);
+    assertTrue("tbl should be unloaded", tbl instanceof IncompleteTable);
+    long createEventId = tbl.getCreateEventId();
+    assertEquals(eventsProcessor_.getLatestEventId(), createEventId);
+
+    // create 2 partitions and update partition stats using Hive
+    try (HiveJdbcClientPool jdbcClientPool = HiveJdbcClientPool.create(1);
+         HiveJdbcClientPool.HiveJdbcClient hiveClient = 
jdbcClientPool.getClient()) {
+      hiveClient.executeSql("set hive.exec.dynamic.partition.mode=nonstrict");
+      hiveClient.executeSql(String.format(
+          "insert into %s.%s partition(p1) values (0,0,0),(1,1,1)",
+          TEST_DB_NAME, tblName));
+      hiveClient.executeSql(String.format(
+          "analyze table %s.%s compute statistics for columns", TEST_DB_NAME, 
tblName));
+    }
+    // All the new events are:
+    // OPEN_TXN
+    // ADD_PARTITION one for adding 2 partitions
+    // COMMIT_TXN
+    // OPEN_TXN
+    // ALTER_PARTITION
+    // ALTER_PARTITION
+    // UPDATE_PART_COL_STAT_EVENT
+    // UPDATE_PART_COL_STAT_EVENT
+    // COMMIT_TXN
+    // Fetch all events with the default skip list.
+    List<NotificationEvent> events =
+        MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
+            eventsProcessor_.catalog_, createEventId, null);
+    assertEquals(5, events.size());
+
+    // Fetch events with a null filter but specifying the wanted event type.
+    events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
+        eventsProcessor_.catalog_, createEventId, /*filter*/null,
+        MetastoreEvents.CreateTableEvent.EVENT_TYPE);
+    assertEquals(0, events.size());
+    events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
+        eventsProcessor_.catalog_, createEventId, /*filter*/null,
+        MetastoreEvents.AddPartitionEvent.EVENT_TYPE);
+    assertEquals(1, events.size());
+    events = MetastoreEventsProcessor.getNextMetastoreEventsInBatches(
+        eventsProcessor_.catalog_, createEventId, /*filter*/null,
+        AlterPartitionEvent.EVENT_TYPE);
+    assertEquals(2, events.size());
+  }
+
   /**
    * Test getCurrentEventId() throws MetastoreNotificationFetchException when 
there are
    * connection issues with HMS.
@@ -3702,7 +3753,7 @@ public class MetastoreEventsProcessorTest {
     try {
       MetaStoreClientPool badPool = new IncompetentMetastoreClientPool(0, 0);
       catalog_.setMetaStoreClientPool(badPool);
-      MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_, 0, 
null);
+      MetastoreEventsProcessor.getNextMetastoreEventsInBatches(catalog_, 0, 
null, null);
     } finally {
       catalog_.setMetaStoreClientPool(origPool);
     }

Reply via email to