This is an automated email from the ASF dual-hosted git repository.

csringhofer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new b67a9cecb IMPALA-13593: Enable event processor to consume 
ALTER_PARTITIONS events from metastore
b67a9cecb is described below

commit b67a9cecb33a7241ad99b33262173dd0287cfe3b
Author: Sai Hemanth Gantasala <saihema...@cloudera.com>
AuthorDate: Mon Dec 9 14:41:00 2024 -0800

    IMPALA-13593: Enable event processor to consume ALTER_PARTITIONS events
    from metastore
    
    HIVE-27746 introduced ALTER_PARTITIONS event type which is an
    optimization of reducing the bulk ALTER_PARTITION events into a single
    event. The components version is updated to pick up this change. It
    would be a good optimization to include this in Impala so that the
    number of events consumed by event processor would be significantly
    reduced and help event processor to catch up with events quickly.
    
    This patch enables the ability to consume ALTER_PARTITIONS event. The
    downside of this patch is that, there is no before_partitions object in
    the event message. This can cause partitions to be refreshed even on
    trivial changes to them. HIVE-29141 will address this concern.
    
    Testing:
    - Added an end-to-end test to verify consuming the ALTER_PARTITIONS
    event. Also, bigger time outs were added in this test as there was
    flakiness observed while looping this test several times.
    
    Change-Id: I009a87ef5e2c331272f9e2d7a6342cc860e64737
    Reviewed-on: http://gerrit.cloudera.org:8080/22554
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Reviewed-by: Csaba Ringhofer <csringho...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   |   5 +-
 bin/create-test-configuration.sh                   |   7 +
 bin/impala-config.sh                               |  22 ++--
 .../org/apache/impala/compat/MetastoreShim.java    |   8 ++
 .../org/apache/impala/compat/MetastoreShim.java    |  34 +++++
 .../impala/catalog/Hive3MetastoreShimBase.java     |  39 ++++++
 .../impala/catalog/events/MetastoreEvents.java     | 144 +++++++++++++++------
 fe/src/test/resources/hive-site.xml.py             |   5 +
 tests/custom_cluster/test_events_custom_configs.py |  63 +++++++++
 9 files changed, 275 insertions(+), 52 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 63b054109..2208bca77 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -241,12 +241,11 @@ DEFINE_string(default_skipped_hms_event_types,
     "the latest event time in HMS.");
 
 DEFINE_string(common_hms_event_types, 
"ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,"
-    
"ADD_PARTITION,ALTER_PARTITION,DROP_PARTITION,CREATE_TABLE,ALTER_TABLE,DROP_TABLE,"
+    
"ALTER_PARTITIONS,CREATE_TABLE,ALTER_TABLE,DROP_TABLE,RELOAD,COMMIT_COMPACTION_EVENT"
     
"CREATE_DATABASE,ALTER_DATABASE,DROP_DATABASE,INSERT,OPEN_TXN,COMMIT_TXN,ABORT_TXN,"
     "ALLOC_WRITE_ID_EVENT,ACID_WRITE_EVENT,BATCH_ACID_WRITE_EVENT,"
     
"UPDATE_TBL_COL_STAT_EVENT,DELETE_TBL_COL_STAT_EVENT,UPDATE_PART_COL_STAT_EVENT,"
-    
"UPDATE_PART_COL_STAT_EVENT_BATCH,DELETE_PART_COL_STAT_EVENT,COMMIT_COMPACTION_EVENT,"
-    "RELOAD",
+    "UPDATE_PART_COL_STAT_EVENT_BATCH,DELETE_PART_COL_STAT_EVENT,",
     "Common HMS event types that will be used in eventTypeSkipList when 
fetching events "
     "from HMS. The strings come from constants in "
     "org.apache.hadoop.hive.metastore.messaging.MessageBuilder. When bumping 
Hive "
diff --git a/bin/create-test-configuration.sh b/bin/create-test-configuration.sh
index f8c376e92..6eb51fde7 100755
--- a/bin/create-test-configuration.sh
+++ b/bin/create-test-configuration.sh
@@ -169,6 +169,13 @@ rm -f hive-site-housekeeping-on/hive-site.xml
 ln -s "${CONFIG_DIR}/hive-site_housekeeping_on.xml" \
     hive-site-housekeeping-on/hive-site.xml
 
+export HIVE_VARIANT=events_config_change
+$IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py 
hive-site_events_config.xml
+mkdir -p hive-site-events-config
+rm -f hive-site-events-config/hive-site.xml
+ln -s "${CONFIG_DIR}/hive-site_events_config.xml" \
+    hive-site-events-config/hive-site.xml
+
 export HIVE_VARIANT=ranger_auth
 HIVE_RANGER_CONF_DIR=hive-site-ranger-auth
 $IMPALA_HOME/bin/generate_xml_config.py hive-site.xml.py 
hive-site_ranger_auth.xml
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 88aab1813..8a4ec56e6 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -240,19 +240,19 @@ fi
 : ${IMPALA_TOOLCHAIN_HOST:=native-toolchain.s3.amazonaws.com}
 export IMPALA_TOOLCHAIN_HOST
 
-export CDP_BUILD_NUMBER=58457853
+export CDP_BUILD_NUMBER=66846208
 export CDP_MAVEN_REPOSITORY=\
 
"https://${IMPALA_TOOLCHAIN_HOST}/build/cdp_components/${CDP_BUILD_NUMBER}/maven";
-export CDP_AVRO_JAVA_VERSION=1.11.1.7.3.1.0-160
-export CDP_HADOOP_VERSION=3.1.1.7.3.1.0-160
-export CDP_HBASE_VERSION=2.4.17.7.3.1.0-160
-export CDP_HIVE_VERSION=3.1.3000.7.3.1.0-160
-export CDP_ICEBERG_VERSION=1.3.1.7.3.1.0-160
-export CDP_KNOX_VERSION=2.0.0.7.3.1.0-160
-export CDP_OZONE_VERSION=1.3.0.7.3.1.0-160
-export CDP_PARQUET_VERSION=1.12.3.7.3.1.0-160
-export CDP_RANGER_VERSION=2.4.0.7.3.1.0-160
-export CDP_TEZ_VERSION=0.9.1.7.3.1.0-160
+export CDP_AVRO_JAVA_VERSION=1.11.1.7.3.1.500-30
+export CDP_HADOOP_VERSION=3.1.1.7.3.1.500-30
+export CDP_HBASE_VERSION=2.4.17.7.3.1.500-30
+export CDP_HIVE_VERSION=3.1.3000.7.3.1.500-30
+export CDP_ICEBERG_VERSION=1.3.1.7.3.1.500-30
+export CDP_KNOX_VERSION=2.0.0.7.3.1.500-30
+export CDP_OZONE_VERSION=1.4.0.7.3.1.500-30
+export CDP_PARQUET_VERSION=1.12.3.7.3.1.500-30
+export CDP_RANGER_VERSION=2.4.0.7.3.1.500-30
+export CDP_TEZ_VERSION=0.9.1.7.3.1.500-30
 
 # Ref: https://infra.apache.org/release-download-pages.html#closer
 : ${APACHE_MIRROR:="https://www.apache.org/dyn/closer.cgi"}
diff --git 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index ac0a0657c..24fe4933b 100644
--- 
a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ 
b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -448,6 +448,14 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
     throw new UnsupportedOperationException("Reload event is not supported.");
   }
 
+  /**
+   *   CDP Hive-3 only function.
+   */
+  public static AlterPartitionsInfo getFieldsFromAlterPartitionsEvent(
+      NotificationEvent event) throws MetastoreNotificationException {
+    throw new UnsupportedOperationException("AlterPartitions event is not 
supported.");
+  }
+
   /**
    *   CDP Hive-3 only function.
    */
diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index e80669919..10f6c3b6c 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.api.WriteNotificationLogBatchRequest;
 import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.AlterPartitionsMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.CommitCompactionMessage;
@@ -606,6 +607,39 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
     return updatedFields;
   }
 
+  /**
+   *  This method extracts the table, partitions, and isTruncateOp fields from 
the
+   *  notification event and returns them in a AlterPartitionsInfo class 
object.
+   *
+   * @param event Metastore notification event,
+   * @return a AlterPartitionsInfo class object required for the reload event.
+   */
+  public static AlterPartitionsInfo getFieldsFromAlterPartitionsEvent(
+      NotificationEvent event) throws MetastoreNotificationException{
+    Preconditions.checkNotNull(event.getMessage());
+    AlterPartitionsMessage alterPartitionsMessage =
+        MetastoreEventsProcessor.getMessageDeserializer()
+            .getAlterPartitionsMessage(event.getMessage());
+    AlterPartitionsInfo alterPartitionsInfo = null;
+    try {
+      Iterator<Partition> partitionsIterator = Preconditions.checkNotNull(
+          alterPartitionsMessage.getPartitionObjs().iterator());
+      List<org.apache.hadoop.hive.metastore.api.Partition> partitionsAfter =
+          new ArrayList<>();
+      while (partitionsIterator.hasNext()) {
+        partitionsAfter.add(partitionsIterator.next());
+      }
+      org.apache.hadoop.hive.metastore.api.Table msTbl = 
Preconditions.checkNotNull(
+          alterPartitionsMessage.getTableObj());
+      boolean isTruncateOp = alterPartitionsMessage.getIsTruncateOp();
+      alterPartitionsInfo = new AlterPartitionsInfo(msTbl, partitionsAfter,
+          isTruncateOp);
+    } catch (Exception e) {
+      throw new MetastoreNotificationException(e);
+    }
+    return alterPartitionsInfo;
+  }
+
   /**
    *  This method extracts the partition name field from the
    *  notification event and returns it in the form of string.
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java 
b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
index 11a22c52b..447636ea1 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Hive3MetastoreShimBase.java
@@ -842,4 +842,43 @@ public class Hive3MetastoreShimBase {
   public static boolean validateColumnName(String name) {
     return MetaStoreUtils.validateColumnName(name);
   }
+
+  /**
+   * Constructs a new AlterPartitionsInfo object.
+   */
+  public static class AlterPartitionsInfo {
+    private final org.apache.hadoop.hive.metastore.api.Table msTable;
+    private final List<org.apache.hadoop.hive.metastore.api.Partition> 
partitions;
+    private final boolean isTruncate;
+
+    public AlterPartitionsInfo(org.apache.hadoop.hive.metastore.api.Table 
msTable,
+        List<org.apache.hadoop.hive.metastore.api.Partition> partitions,
+        boolean isTruncate) {
+      this.msTable = msTable;
+      this.partitions = partitions;
+      this.isTruncate = isTruncate;
+    }
+
+    /**
+     * Returns the Thrift representation of the table.
+     */
+    public org.apache.hadoop.hive.metastore.api.Table getMsTable() {
+      return msTable;
+    }
+
+    /**
+     * Returns the list of Thrift partition objects affected by the reload.
+     * Can be null or empty if not applicable.
+     */
+    public List<org.apache.hadoop.hive.metastore.api.Partition> 
getPartitions() {
+      return partitions;
+    }
+
+    /**
+     * Returns true if the reload operation was due to a TRUNCATE TABLE.
+     */
+    public boolean isTruncate() {
+      return isTruncate;
+    }
+  }
 }
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index 2405e91f6..1055b212c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Map;
@@ -128,6 +129,7 @@ public class MetastoreEvents {
     DROP_DATABASE("DROP_DATABASE"),
     ALTER_DATABASE("ALTER_DATABASE"),
     ADD_PARTITION("ADD_PARTITION"),
+    ALTER_BATCH_PARTITIONS("ALTER_BATCH_PARTITIONS"),
     ALTER_PARTITION("ALTER_PARTITION"),
     ALTER_PARTITIONS("ALTER_PARTITIONS"),
     DROP_PARTITION("DROP_PARTITION"),
@@ -227,6 +229,8 @@ public class MetastoreEvents {
           return new DropPartitionEvent(catalogOpExecutor_, metrics, event);
         case ALTER_PARTITION:
           return new AlterPartitionEvent(catalogOpExecutor_, metrics, event);
+        case ALTER_PARTITIONS:
+          return new AlterPartitionsEvent(catalogOpExecutor_, metrics, event);
         case RELOAD:
           return new ReloadEvent(catalogOpExecutor_, metrics, event);
         case INSERT:
@@ -1367,6 +1371,52 @@ public class MetastoreEvents {
       return false;
     }
 
+    protected void processAlterPartitionEvent(boolean isTruncateOp,
+        List<org.apache.hadoop.hive.metastore.api.Partition> partitionsAfter)
+        throws CatalogException, MetastoreNotificationException {
+      // Reload the whole table if it's a transactional table or materialized 
view.
+      // Materialized views are treated as a special case because it's 
possible to
+      // receive partition event on MVs, but they are regular views in Impala. 
That
+      // cause problems on the reloading partition logic which expects it to 
be a
+      // HdfsTable.
+      if (AcidUtils.isTransactionalTable(msTbl_.getParameters())
+          || MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
+        reloadTransactionalTable(partitionsAfter);
+      } else {
+        try {
+          // load file metadata only if storage descriptor of partitionAfter_ 
differs
+          // from sd of HdfsPartition. If the alter_partition event type is of 
truncate
+          // then force load the file metadata.
+          FileMetadataLoadOpts fileMetadataLoadOpts =
+              isTruncateOp ? FileMetadataLoadOpts.FORCE_LOAD :
+                  FileMetadataLoadOpts.LOAD_IF_SD_CHANGED;
+          reloadPartitions(partitionsAfter, fileMetadataLoadOpts, 
getEventDesc(),
+              false);
+        } catch (CatalogException e) {
+          throw new MetastoreNotificationException(
+              debugString("Refresh partitions on table {} failed. Event " +
+                  "processing cannot continue. Issue an invalidate command to 
reset " +
+                  "the event processor state.", getFullyQualifiedTblName(), 
e));
+        }
+      }
+    }
+
+    protected void reloadTransactionalTable(
+        List<org.apache.hadoop.hive.metastore.api.Partition> partitionsAfter)
+        throws CatalogException {
+      boolean incrementalRefresh =
+          
BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
+      if (incrementalRefresh) {
+        reloadPartitionsFromEvent(partitionsAfter, getEventDesc()
+            + " FOR TRANSACTIONAL TABLE");
+      } else {
+        boolean notSkipped = reloadTableFromCatalog(true);
+        if (!notSkipped) {
+          
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+        }
+      }
+    }
+
     @Override
     protected void process() throws MetastoreNotificationException, 
CatalogException {
       Timer.Context context = null;
@@ -2657,7 +2707,7 @@ public class MetastoreEvents {
 
     @Override
     protected MetastoreEventType getBatchEventType() {
-      return MetastoreEventType.ALTER_PARTITIONS;
+      return MetastoreEventType.ALTER_BATCH_PARTITIONS;
     }
 
     @Override
@@ -2721,32 +2771,7 @@ public class MetastoreEvents {
             getEventId());
         return;
       }
-      // Reload the whole table if it's a transactional table or materialized 
view.
-      // Materialized views are treated as a special case because it's 
possible to
-      // receive partition event on MVs, but they are regular views in Impala. 
That
-      // cause problems on the reloading partition logic which expects it to 
be a
-      // HdfsTable.
-      if (AcidUtils.isTransactionalTable(msTbl_.getParameters())
-          || MetaStoreUtils.isMaterializedViewTable(msTbl_)) {
-        reloadTransactionalTable();
-      } else {
-        // Refresh the partition that was altered.
-        Preconditions.checkNotNull(partitionAfter_);
-        try {
-          // load file metadata only if storage descriptor of partitionAfter_ 
differs
-          // from sd of HdfsPartition. If the alter_partition event type is of 
truncate
-          // then force load the file metadata.
-          FileMetadataLoadOpts fileMetadataLoadOpts =
-              isTruncateOp_ ? FileMetadataLoadOpts.FORCE_LOAD :
-                  FileMetadataLoadOpts.LOAD_IF_SD_CHANGED;
-          reloadPartitions(Arrays.asList(partitionAfter_), 
fileMetadataLoadOpts,
-              getEventDesc(), false);
-        } catch (CatalogException e) {
-          throw new MetastoreNotificationNeedsInvalidateException(
-              debugString("Refresh partition on table {} partition {} failed.",
-                  getFullyQualifiedTblName(), partName_), e);
-        }
-      }
+      processAlterPartitionEvent(isTruncateOp_, 
Arrays.asList(partitionAfter_));
     }
 
     @Override
@@ -2775,20 +2800,63 @@ public class MetastoreEvents {
           Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_, 
partitionAfter_)),
           partitionAfter_.getParameters());
     }
+  }
 
-    private void reloadTransactionalTable() throws CatalogException {
-      boolean incrementalRefresh =
-          
BackendConfig.INSTANCE.getHMSEventIncrementalRefreshTransactionalTable();
-      if (incrementalRefresh) {
-        reloadPartitionsFromEvent(Collections.singletonList(partitionAfter_),
-            getEventDesc() + " FOR TRANSACTIONAL TABLE");
-      } else {
-        boolean notSkipped = reloadTableFromCatalog(true);
-        if (!notSkipped) {
-          
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
-        }
+  public static class AlterPartitionsEvent extends MetastoreTableEvent {
+    public static final String EVENT_TYPE = "ALTER_PARTITIONS";
+    // the list of partition objects of alter operation, as parsed from the
+    // NotificationEvent
+    private final List<org.apache.hadoop.hive.metastore.api.Partition> 
partitionsAfter_;
+    private final boolean isTruncateOp_;
+
+    /**
+     * Prevent instantiation from outside should use MetastoreEventFactory 
instead
+     */
+    private AlterPartitionsEvent(CatalogOpExecutor catalogOpExecutor, Metrics 
metrics,
+        NotificationEvent event) throws MetastoreNotificationException {
+      super(catalogOpExecutor, metrics, event);
+      Preconditions.checkState(getEventType().equals(
+          MetastoreEventType.ALTER_PARTITIONS));
+      try {
+        MetastoreShim.AlterPartitionsInfo alterPartitionsInfo =
+            MetastoreShim.getFieldsFromAlterPartitionsEvent(event);
+        msTbl_ = alterPartitionsInfo.getMsTable();
+        partitionsAfter_ = alterPartitionsInfo.getPartitions();
+        isTruncateOp_ = alterPartitionsInfo.isTruncate();
+      } catch (Exception e) {
+        throw new MetastoreNotificationException(
+            debugString("Unable to parse the alter partition message"), e);
       }
     }
+
+    @Override
+    public void processTableEvent() throws MetastoreNotificationException,
+        CatalogException {
+      if (partitionsAfter_.isEmpty()) {
+        
metrics_.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).inc();
+        warnLog("Not processing the alter partitions event {} as no partitions 
are " +
+            "received in the event.", getEventId());
+        return;
+      }
+      if (isSelfEvent()) {
+        infoLog("Not processing the event as it is a self-event");
+        return;
+      }
+
+      if (isOlderEvent(partitionsAfter_.get(0))) {
+        infoLog("Not processing the alter partition event {} as it is an older 
event",
+            getEventId());
+        return;
+      }
+      processAlterPartitionEvent(isTruncateOp_, partitionsAfter_);
+    }
+
+    @Override
+    public SelfEventContext getSelfEventContext() {
+      return new SelfEventContext(dbName_, tblName_,
+          Arrays.asList(getTPartitionSpecFromHmsPartition(msTbl_,
+              partitionsAfter_.get(0))), 
partitionsAfter_.get(0).getParameters());
+    }
   }
 
   /**
diff --git a/fe/src/test/resources/hive-site.xml.py 
b/fe/src/test/resources/hive-site.xml.py
index d3a3609b4..d2bf9a9d9 100644
--- a/fe/src/test/resources/hive-site.xml.py
+++ b/fe/src/test/resources/hive-site.xml.py
@@ -95,6 +95,11 @@ elif VARIANT == 'housekeeping_on':
   CONFIG.update({
     'hive.metastore.housekeeping.threads.on': 'true',
   })
+elif VARIANT == 'events_config_change':
+  # HMS config change needed for HIVE-27746 to emit ALTER_PARTITIONS event
+  CONFIG.update({
+    'hive.metastore.alterPartitions.notification.v2.enabled': 'true',
+  })
 
 # HBase-related configs.
 # Impala processes need to connect to zookeeper on INTERNAL_LISTEN_HOST for 
HBase.
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index 5d3b11b26..7a0f7031d 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -39,6 +39,8 @@ from tests.util.iceberg_util import IcebergCatalogs
 
 HIVE_SITE_HOUSEKEEPING_ON =\
     getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-housekeeping-on'
+HIVE_SITE_ALTER_PARTITIONS_EVENT =\
+  getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-events-config'
 TRUNCATE_TBL_STMT = 'truncate table'
 # The statestore heartbeat and topic update frequency (ms). Set low for 
testing.
 STATESTORE_RPC_FREQUENCY_MS = 100
@@ -1694,6 +1696,67 @@ class 
TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
         self.client.execute("""DROP DATABASE {} 
CASCADE""".format(unique_database))
         self.client.execute("""CREATE DATABASE {}""".format(unique_database))
 
+  @SkipIf.is_test_jdk
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal 
--hms_event_polling_interval_s=1 "
+                  "--debug_actions=catalogd_event_processing_delay:SLEEP@1000",
+    hive_conf_dir=HIVE_SITE_ALTER_PARTITIONS_EVENT)
+  def test_alter_partitions_event_from_metastore(self, unique_database):
+    tbl = unique_database + ".test_alter_partitions"
+    self.client.execute("create table {} (id int) partitioned by (year int)"
+      .format(tbl))
+
+    def _verify_alter_partitions_event(events):
+      event_found = False
+      for event in events:
+        if event.eventType == "ALTER_PARTITIONS":
+          event_found = True
+        else:
+          logging.debug("Found " + str(event))
+      return event_found
+
+    # Verify that test always generates single ALTER_PARTITIONS event
+    self.client.execute(
+      "insert into {} partition(year) values (0,2024), (1,2023), (2,2022)"
+      .format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self, 10)
+
+    # Case-I: compute stats from hive
+    parts_refreshed_before = 
EventProcessorUtils.get_int_metric("partitions-refreshed")
+    batch_events_before = 
EventProcessorUtils.get_int_metric("batch-events-created")
+    self.run_stmt_in_hive("analyze table {} compute statistics".format(tbl))
+    EventProcessorUtils.wait_for_event_processing(self, 10)
+    batch_events_after = 
EventProcessorUtils.get_int_metric("batch-events-created")
+    parts_refreshed_after = 
EventProcessorUtils.get_int_metric("partitions-refreshed")
+    assert batch_events_after == batch_events_before  # verify there are no 
new batches
+    assert parts_refreshed_after == parts_refreshed_before + 3
+
+    # Case-II: compute stats from impala
+    last_event_id = 
EventProcessorUtils.get_current_notification_id(self.hive_client)
+    self.client.execute("compute stats {}".format(tbl))
+    events_skipped_before = 
EventProcessorUtils.get_int_metric('events-skipped', 0)
+    EventProcessorUtils.wait_for_event_processing(self, 10)
+    events = EventProcessorUtils.get_next_notification(self.hive_client, 
last_event_id)
+    # There will be COMMIT_TXN, ALLOC_WRITE_ID_EVENT, ALTER_PARTITIONS in any 
order
+    events_skipped_after = 
EventProcessorUtils.get_int_metric('events-skipped', 0)
+    assert _verify_alter_partitions_event(events)
+    assert events_skipped_after > events_skipped_before
+
+    # Case-III: truncate table from Impala
+    last_event_id = 
EventProcessorUtils.get_current_notification_id(self.hive_client)
+    self.client.execute("truncate table {}".format(tbl))
+    events_skipped_before = 
EventProcessorUtils.get_int_metric('events-skipped', 0)
+    EventProcessorUtils.wait_for_event_processing(self, 10)
+    events = EventProcessorUtils.get_next_notification(self.hive_client, 
last_event_id)
+    events_skipped_after = 
EventProcessorUtils.get_int_metric('events-skipped', 0)
+    assert _verify_alter_partitions_event(events)
+    assert events_skipped_after > events_skipped_before
+
+    # Case-IV: Truncate table from Hive is currently generating single 
alter_partition
+    # events. HIVE-28668 will address it.
+
 
 @SkipIfFS.hive
 class TestEventProcessingWithImpala(TestEventProcessingCustomConfigsBase):

Reply via email to