This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit b9c2e00a6ba1eaf1ed3f63013be60408409152b6
Author: stiga-huang <[email protected]>
AuthorDate: Fri Mar 1 13:08:31 2024 +0800

    IMPALA-12855: Fix NPE in firing RELOAD events when the partition doesn't 
exist
    
    When --enable_reload_events is set to true, catalogd will fire RELOAD
    events for INVALIDATE/REFRESH statements. When the RELOAD event is fired
    successfully for a REFRESH statement, we also update lastRefreshEventId
    of the table/partition. This part could hit NullPointerException when
    the partition is dropped by concurrent DDLs.
    
    This patch ignores updating lastRefreshEventId if the partition doesn't
    exists. Note that ideally we should hold the table lock of REFRESH until
    finish firing the RELOAD events and updating lastRefreshEventId. So no
    concurrent operations can drop the partition. However, when the table is
    loaded from scratch, we don't actually hold the table write lock. We
    just load the table and take a read lock to get the thrift object. The
    partition could still be dropped concurrently after the load and before
    taking the read lock. So ignoring missing partitions is a simpler
    solution.
    
    Refactors some codes of fireReloadEventAndUpdateRefreshEventId to save
    some indention and avoid acquiring table lock if no events are fired.
    Adds error messages in some Precondition checks in methods used by this
    feature. Also refactors Table.getFullName() to not always constructing
    the result. Improves logs of not reloading a partition for an event.
    
    Tests:
     - Add e2e test
    
    Change-Id: I01af3624bf7cf5cd69935cffa28d54f6a6807504
    Reviewed-on: http://gerrit.cloudera.org:8080/21096
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../org/apache/impala/compat/MetastoreShim.java    |  2 +-
 .../java/org/apache/impala/catalog/FeFsTable.java  | 10 ++-
 .../main/java/org/apache/impala/catalog/Table.java |  4 +-
 .../impala/catalog/events/MetastoreEvents.java     |  7 +-
 .../apache/impala/service/CatalogOpExecutor.java   | 82 ++++++++++++----------
 tests/custom_cluster/test_events_custom_configs.py | 27 +++++++
 6 files changed, 86 insertions(+), 46 deletions(-)

diff --git 
a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java 
b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 2f11da0c3..c004f8b9a 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -545,7 +545,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
       LOG.error("FireEventResponse does not have event ids set for table 
{}.{}. This "
               + "may cause the table to unnecessarily be refreshed when the " +
               "refresh/invalidate event is received.", dbName, tableName);
-      return Collections.EMPTY_LIST;
+      return Collections.emptyList();
     }
     return response.getEventIds();
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 47847348f..e5853251f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -595,7 +595,7 @@ public interface FeFsTable extends FeTable {
       Set<String> keys = new HashSet<>();
       for (FieldSchema fs: table.getMetaStoreTable().getPartitionKeys()) {
         for (TPartitionKeyValue kv: partitionSpec) {
-          if (fs.getName().toLowerCase().equals(kv.getName().toLowerCase())) {
+          if (fs.getName().equalsIgnoreCase(kv.getName())) {
             targetValues.add(kv.getValue());
             // Same key was specified twice
             if (!keys.add(kv.getName().toLowerCase())) {
@@ -615,7 +615,9 @@ public interface FeFsTable extends FeTable {
       // match the values being searched for.
       for (PrunablePartition partition: table.getPartitions()) {
         List<LiteralExpr> partitionValues = partition.getPartitionValues();
-        Preconditions.checkState(partitionValues.size() == 
targetValues.size());
+        Preconditions.checkState(partitionValues.size() == targetValues.size(),
+            "Partition values not match in table %s: %s != %s",
+            table.getFullName(), partitionValues.size(), targetValues.size());
         boolean matchFound = true;
         for (int i = 0; i < targetValues.size(); ++i) {
           String value;
@@ -623,7 +625,9 @@ public interface FeFsTable extends FeTable {
             value = table.getNullPartitionKeyValue();
           } else {
             value = partitionValues.get(i).getStringValue();
-            Preconditions.checkNotNull(value);
+            Preconditions.checkNotNull(value,
+                "Got null string from non-null partition value of table %s: 
i=%s",
+                table.getFullName(), i);
             // See IMPALA-252: we deliberately map empty strings on to
             // NULL when they're in partition columns. This is for
             // backwards compatibility with Hive, and is clearly broken.
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java 
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 15ec5784b..4dbd4fcfe 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -83,6 +83,7 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   protected org.apache.hadoop.hive.metastore.api.Table msTable_;
   protected final Db db_;
   protected final String name_;
+  protected final String full_name_;
   protected final String owner_;
   protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE;
   // Lock protecting this table. A read lock must be table when we are 
serializing
@@ -224,6 +225,7 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
     msTable_ = msTable;
     db_ = db;
     name_ = name.toLowerCase();
+    full_name_ = (db_ != null ? db_.getName() + "." : "") + name_;
     owner_ = owner;
     tableStats_ = new TTableStats(-1);
     tableStats_.setTotal_file_bytes(-1);
@@ -850,7 +852,7 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   public String getName() { return name_; }
 
   @Override // FeTable
-  public String getFullName() { return (db_ != null ? db_.getName() + "." : 
"") + name_; }
+  public String getFullName() { return full_name_; }
 
   @Override // FeTable
   public TableName getTableName() {
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java 
b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
index b5b1f6db0..a344e2231 100644
--- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
+++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java
@@ -1055,7 +1055,8 @@ public class MetastoreEvents {
         throws CatalogException {
       try {
         int numPartsRefreshed = 
catalogOpExecutor_.reloadPartitionsIfExist(getEventId(),
-            dbName_, tblName_, partitions, reason, fileMetadataLoadOpts);
+            getEventType().toString(), dbName_, tblName_, partitions, reason,
+            fileMetadataLoadOpts);
         if (numPartsRefreshed > 0) {
           
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES)
               .inc(numPartsRefreshed);
@@ -1098,8 +1099,8 @@ public class MetastoreEvents {
         FileMetadataLoadOpts fileMetadataLoadOpts) throws CatalogException {
       try {
         int numPartsRefreshed = 
catalogOpExecutor_.reloadPartitionsFromNamesIfExists(
-            getEventId(), dbName_, tblName_, partitionNames, reason,
-            fileMetadataLoadOpts);
+            getEventId(), getEventType().toString(), dbName_, tblName_, 
partitionNames,
+            reason, fileMetadataLoadOpts);
         if (numPartsRefreshed > 0) {
           
metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_PARTITION_REFRESHES)
                   .inc(numPartsRefreshed);
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 3e620aa51..1e5aa1802 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -4717,7 +4717,8 @@ public class CatalogOpExecutor {
    * Reloads the given partitions if they exist and have not been removed 
since the event
    * was generated.
    *
-   * @param eventId EventId being processed.
+   * @param eventId EventId of the event being processed.
+   * @param eventType EventType of the event being processed.
    * @param dbName Database name for the partition
    * @param tblName Table name for the partition
    * @param partsFromEvent List of {@link Partition} objects from the events 
to be
@@ -4727,8 +4728,8 @@ public class CatalogOpExecutor {
    * @return the number of partitions which were reloaded. If the table does 
not exist,
    * returns 0. Some partitions could be skipped if they don't exist anymore.
    */
-  public int reloadPartitionsIfExist(long eventId, String dbName, String 
tblName,
-      List<Partition> partsFromEvent, String reason,
+  public int reloadPartitionsIfExist(long eventId, String eventType, String 
dbName,
+      String tblName, List<Partition> partsFromEvent, String reason,
       FileMetadataLoadOpts fileMetadataLoadOpts) throws CatalogException {
     List<String> partNames = new ArrayList<>();
     Table table = catalog_.getTable(dbName, tblName);
@@ -4739,8 +4740,8 @@ public class CatalogOpExecutor {
             part.getValues(), null));
       }
     }
-    return reloadPartitionsFromNamesIfExists(eventId, dbName, tblName, 
partNames,
-        reason, fileMetadataLoadOpts);
+    return reloadPartitionsFromNamesIfExists(eventId, eventType, dbName, 
tblName,
+        partNames, reason, fileMetadataLoadOpts);
   }
 
   /**
@@ -4756,8 +4757,8 @@ public class CatalogOpExecutor {
    * @return the number of partitions which were reloaded. If the table does 
not exist,
    * returns 0. Some partitions could be skipped if they don't exist anymore.
    */
-  public int reloadPartitionsFromNamesIfExists (long eventId, String dbName,
-      String tblName, List<String> partNames, String reason,
+  public int reloadPartitionsFromNamesIfExists(long eventId, String eventType,
+      String dbName, String tblName, List<String> partNames, String reason,
       FileMetadataLoadOpts fileMetadataLoadOpts) throws CatalogException {
     Table table = catalog_.getTable(dbName, tblName);
     if (table == null) {
@@ -4765,9 +4766,8 @@ public class CatalogOpExecutor {
           .getDeleteEventLog();
       if (deleteEventLog
           .wasRemovedAfter(eventId, DeleteEventLog.getTblKey(dbName, 
tblName))) {
-        LOG.info(
-            "Not reloading the partition of table {} since it was removed "
-                + "later in catalog", new TableName(dbName, tblName));
+        LOG.info("EventId: {} EventType: {} Not reloading the partition of 
table {}.{} " +
+            "since it was removed later in catalog", eventId, eventType, 
dbName, tblName);
         return 0;
       } else {
         throw new TableNotFoundException(
@@ -4775,8 +4775,8 @@ public class CatalogOpExecutor {
       }
     }
     if (table instanceof IncompleteTable) {
-      LOG.info("Table {} is not loaded. Skipping drop partition event {}",
-          table.getFullName(), eventId);
+      LOG.info("Table {} is not loaded. Skipping {} event {}",
+          table.getFullName(), eventType, eventId);
       return 0;
     }
     if (!(table instanceof HdfsTable)) {
@@ -4791,9 +4791,9 @@ public class CatalogOpExecutor {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
       if (syncToLatestEventId && table.getLastSyncedEventId() >= eventId) {
-        LOG.info("Not reloading partition from event id: {} since table {} is 
already "
-                + "synced till event id {}", eventId, table.getFullName(),
-            table.getLastSyncedEventId());
+        LOG.info("EventId: {} EventType: {} Not reloading partition since 
table {} is " +
+                "already synced till event id {}",
+            eventId, eventType, table.getFullName(), 
table.getLastSyncedEventId());
         return 0;
       }
       HdfsTable hdfsTable = (HdfsTable) table;
@@ -4808,8 +4808,8 @@ public class CatalogOpExecutor {
       hdfsTable.setCatalogVersion(newCatalogVersion);
       return numOfPartsReloaded;
     } catch (TableLoadingException e) {
-      LOG.info("Could not reload {} partitions of table {}", partNames.size(),
-          table.getFullName(), e);
+      LOG.info("EventId: {} EventType: {} Could not reload {} partitions of 
table {}",
+          eventId, eventType, partNames.size(), table.getFullName(), e);
     } catch (InternalException e) {
       errorOccured = true;
       throw new CatalogException(
@@ -6668,8 +6668,9 @@ public class CatalogOpExecutor {
       Reference<Boolean> dbWasAdded = new Reference<Boolean>(false);
       // Thrift representation of the result of the invalidate/refresh 
operation.
       TCatalogObject updatedThriftTable = null;
-      TableName tblName = TableName.fromThrift(req.getTable_name());
+      // Result table of the invalidate/refresh operation.
       Table tbl = null;
+      TableName tblName = TableName.fromThrift(req.getTable_name());
       if (!req.isIs_refresh()) {
         // For INVALIDATE METADATA <db>.<table>, the db might be unloaded.
         // So we can't update 'tbl' here.
@@ -6786,39 +6787,44 @@ public class CatalogOpExecutor {
    * Helper class for refresh event.
    * This class invokes metastore shim's fireReloadEvent to fire event to HMS
    * and update the last refresh event id in the cache
-   * @param req - request object for TResetMetadataRequest.
-   * @param tblName
-   * @param tbl
    */
   private void fireReloadEventAndUpdateRefreshEventId(
       TResetMetadataRequest req, TableName tblName, Table tbl) {
     List<String> partVals = null;
     if (req.isSetPartition_spec()) {
       partVals = req.getPartition_spec().stream().
-          map(partSpec -> partSpec.getValue()).collect(Collectors.toList());
+          map(TPartitionKeyValue::getValue).collect(Collectors.toList());
     }
     try {
       List<Long> eventIds = MetastoreShim.fireReloadEventHelper(
           catalog_.getMetaStoreClient(), req.isIs_refresh(), partVals, 
tblName.getDb(),
           tblName.getTbl(), Collections.emptyMap());
-      if (req.isIs_refresh()) {
-        if (catalog_.tryLock(tbl, true, 600000)) {
-          if (!eventIds.isEmpty()) {
-            if (req.isSetPartition_spec()) {
-              HdfsPartition partition = ((HdfsTable) tbl)
-                  
.getPartitionFromThriftPartitionSpec(req.getPartition_spec());
-              HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
-              partBuilder.setLastRefreshEventId(eventIds.get(0));
-              ((HdfsTable) tbl).updatePartition(partBuilder);
-            } else {
-              tbl.setLastRefreshEventId(eventIds.get(0));
-            }
-          }
+      LOG.info("Fired {} RELOAD events for table {}: {}", eventIds.size(),
+          tbl.getFullName(), StringUtils.join(",", eventIds));
+      // Update the lastRefreshEventId accordingly
+      if (!req.isIs_refresh() || eventIds.isEmpty()) return;
+      if (!catalog_.tryLock(tbl, true, 600000)) {
+        LOG.warn("Couldn't obtain a version lock for the table: {}. " +
+                "Self events may go undetected in that case",
+            tbl.getFullName());
+        return;
+      }
+      if (req.isSetPartition_spec()) {
+        HdfsTable hdfsTbl = (HdfsTable) tbl;
+        HdfsPartition partition = hdfsTbl
+            .getPartitionFromThriftPartitionSpec(req.getPartition_spec());
+        if (partition != null) {
+          HdfsPartition.Builder partBuilder = new 
HdfsPartition.Builder(partition);
+          partBuilder.setLastRefreshEventId(eventIds.get(0));
+          hdfsTbl.updatePartition(partBuilder);
         } else {
-          LOG.warn(String.format("Couldn't obtain a version lock for the 
table: %s. " +
-              "Self events may go undetected in that case",
-              tbl.getName()));
+          LOG.warn("Partition {} no longer exists in table {}. It might be " +
+              "dropped by a concurrent operation.",
+              FeCatalogUtils.getPartitionName(hdfsTbl, partVals),
+              hdfsTbl.getFullName());
         }
+      } else {
+        tbl.setLastRefreshEventId(eventIds.get(0));
       }
     } catch (TException | CatalogException e) {
       LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR,
diff --git a/tests/custom_cluster/test_events_custom_configs.py 
b/tests/custom_cluster/test_events_custom_configs.py
index eda2e6163..dafcb5854 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -21,6 +21,7 @@ from os import getenv
 import pytest
 
 
+from beeswaxd.BeeswaxService import QueryState
 from hive_metastore.ttypes import FireEventRequest
 from hive_metastore.ttypes import FireEventRequestData
 from hive_metastore.ttypes import InsertEventRequestData
@@ -327,6 +328,32 @@ class 
TestEventProcessingCustomConfigs(CustomClusterTestSuite):
     EventProcessorUtils.wait_for_event_processing(self)
     assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
 
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--enable_reload_events=true")
+  def test_reload_events_with_transient_partitions(self, unique_database):
+    tbl = unique_database + ".tbl"
+    create_stmt = "create table {} (i int) partitioned by(p int)".format(tbl)
+    add_part_stmt = "alter table {} add if not exists 
partition(p=0)".format(tbl)
+    drop_part_stmt = "alter table {} drop if exists partition(p=0)".format(tbl)
+    refresh_stmt = "refresh {} partition(p=0)".format(tbl)
+    end_states = [self.client.QUERY_STATES['FINISHED'],
+                  self.client.QUERY_STATES['EXCEPTION']]
+
+    self.execute_query(create_stmt)
+    self.execute_query(add_part_stmt)
+    # Run REFRESH partition in the background so we can drop the partition 
concurrently.
+    refresh_handle = self.client.execute_async(refresh_stmt)
+    # Before IMPALA-12855, REFRESH usually fails in 2-3 rounds.
+    for i in range(100):
+      self.execute_query(drop_part_stmt)
+      refresh_state = self.wait_for_any_state(refresh_handle, end_states, 10)
+      assert refresh_state == self.client.QUERY_STATES['FINISHED'],\
+          "REFRESH state: {}. Error log: {}".format(
+              QueryState._VALUES_TO_NAMES[refresh_state],
+              self.client.get_log(refresh_handle))
+      self.execute_query(add_part_stmt)
+      refresh_handle = self.client.execute_async(refresh_stmt)
+
   @CustomClusterTestSuite.with_args(
     catalogd_args="--hms_event_polling_interval_s=10"
                   " --enable_skipping_older_events=true"

Reply via email to