This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ab6c9467f IMPALA-12831: Fix HdfsTable.toMinimalTCatalogObject() failed 
by concurrent modification
ab6c9467f is described below

commit ab6c9467f6347671b971dbce4c640bea032b6ed9
Author: stiga-huang <[email protected]>
AuthorDate: Mon Feb 26 15:59:31 2024 +0800

    IMPALA-12831: Fix HdfsTable.toMinimalTCatalogObject() failed by concurrent 
modification
    
    HdfsTable.toMinimalTCatalogObject() is not always invoked with holding
    the table lock, e.g. in invalidating a table, we could replace an
    HdfsTable instance with an IncompleteTable instance. We then invoke
    HdfsTable.toMinimalTCatalogObject() to get the removed catalog object.
    However, the HdfsTable instance could be modified in the meantime by a
    concurrent DDL/DML that would reload it, e.g. a REFRESH statement. This
    causes HdfsTable.toMinimalTCatalogObject() failed by
    ConcurrentModificationException on the column/partition list.
    
    This patch fixes the issue by try acquiring the table read lock in
    HdfsTable.toMinimalTCatalogObject(). If it fails, the partition ids and
    names won't be returned. Also updates the method to not collecting the
    column list since it's unused.
    
    Tests
     - Added e2e test
     - Ran CORE tests
    
    Change-Id: Ie9f8e65c0bd24000241eedf8ca765c1e4e14fdb3
    Reviewed-on: http://gerrit.cloudera.org:8080/21072
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  | 58 +++++++++++++++++-----
 .../main/java/org/apache/impala/catalog/Table.java |  4 +-
 tests/custom_cluster/test_concurrent_ddls.py       | 23 +++++++++
 3 files changed, 70 insertions(+), 15 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index f3c3e3132..0468bbd9e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -207,6 +207,8 @@ public class HdfsTable extends Table implements FeFsTable {
   public static final String LOAD_DURATION_FILE_METADATA_ALL_PARTITIONS =
       "load-duration.all-partitions.file-metadata";
 
+  private static final THdfsPartition FAKE_THDFS_PARTITION = new 
THdfsPartition();
+
   // string to indicate NULL. set in load() from table properties
   private String nullColumnValue_;
 
@@ -2129,7 +2131,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
   }
 
   /**
-   * Just likes super.toMinimalTCatalogObject() but also contains the minimal 
catalog
+   * Just likes super.toMinimalTCatalogObject() but try to add the minimal 
catalog
    * objects of partitions in the returned result.
    */
   @Override
@@ -2138,19 +2140,49 @@ public class HdfsTable extends Table implements 
FeFsTable {
     if (!BackendConfig.INSTANCE.isIncrementalMetadataUpdatesEnabled()) {
       return catalogObject;
     }
-    catalogObject.getTable().setTable_type(TTableType.HDFS_TABLE);
-    THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
-        nullPartitionKeyValue_, nullColumnValue_,
-        /*idToPartition=*/ new HashMap<>(),
-        /*prototypePartition=*/ new THdfsPartition());
-    for (HdfsPartition part : partitionMap_.values()) {
-      hdfsTable.partitions.put(part.getId(), part.toMinimalTHdfsPartition());
+    // Try adding the partition ids and names if we can acquire the read lock.
+    // Use tryReadLock() to avoid being blocked by concurrent DDLs. It won't 
result in
+    // correctness issues if we return the result without the partition ids 
and names.
+    // The results are mainly used in two scenarios:
+    // 1. The result is added to catalog deleteLog for sending delete updates 
for
+    //    partitions. The delete update for the table is always sent so 
coordinators are
+    //    able to invalidate the HdfsTable object. This enforces the 
correctness.
+    //    However, not sending the deletes causes a leak of the topic values in
+    //    statestore if the topic keys (tableName+partName) are not reused 
anymore.
+    //    Note that topic keys are never deleted in the TopicEntryMap of 
statestore even
+    //    if we send the delete updates. So we already have a leak on catalog 
topic keys.
+    //    We can revisit this if we found statestore has memory issues.
+    // 2. The result is used in DDL/DML response for a removed/invalidated 
table.
+    //    Coordinators can still invalidate its cache since the table is sent 
(in the
+    //    parent implementation).
+    //    However, LocalCatalog coordinators can't immediately invalidate 
partitions of
+    //    a removed table. They will be cleared by the cache eviction policy 
since the
+    //    partitions of deleted tables won't be used anymore.
+    // TODO: synchronize the access on the partition map by using a 
finer-grained lock
+    if (!tryReadLock()) {
+      LOG.warn("Not returning the partition ids and names of table {} since 
not " +
+          "holding the table read lock", getFullName());
+      return catalogObject;
+    }
+    try {
+      catalogObject.getTable().setTable_type(TTableType.HDFS_TABLE);
+      // Column names are unused by the consumers so use an empty list here.
+      THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_,
+          /*colNames=*/ Collections.emptyList(),
+          nullPartitionKeyValue_, nullColumnValue_,
+          /*idToPartition=*/ new HashMap<>(),
+          /*prototypePartition=*/ FAKE_THDFS_PARTITION);
+      for (HdfsPartition part : partitionMap_.values()) {
+        hdfsTable.partitions.put(part.getId(), part.toMinimalTHdfsPartition());
+      }
+      hdfsTable.setHas_full_partitions(false);
+      // The minimal catalog object of partitions contain the partition names.
+      hdfsTable.setHas_partition_names(true);
+      catalogObject.getTable().setHdfs_table(hdfsTable);
+      return catalogObject;
+    } finally {
+      releaseReadLock();
     }
-    hdfsTable.setHas_full_partitions(false);
-    // The minimal catalog object of partitions contain the partition names.
-    hdfsTable.setHas_partition_names(true);
-    catalogObject.getTable().setHdfs_table(hdfsTable);
-    return catalogObject;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java 
b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 4dbd4fcfe..6299dfc89 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -86,10 +86,10 @@ public abstract class Table extends CatalogObjectImpl 
implements FeTable {
   protected final String full_name_;
   protected final String owner_;
   protected TAccessLevel accessLevel_ = TAccessLevel.READ_WRITE;
-  // Lock protecting this table. A read lock must be table when we are 
serializing
+  // Lock protecting this table. A read lock must be held when we are 
serializing
   // the table contents over thrift (e.g when returning the table to clients 
over thrift
   // or when topic-update thread serializes the table in the topic update)
-  // A write lock must be table when the table is being modified (e.g. DDLs or 
refresh)
+  // A write lock must be held when the table is being modified (e.g. DDLs or 
refresh)
   private final ReentrantReadWriteLock tableLock_ = new ReentrantReadWriteLock(
       true /*fair ordering*/);
   private final ReadLock readLock_ = tableLock_.readLock();
diff --git a/tests/custom_cluster/test_concurrent_ddls.py 
b/tests/custom_cluster/test_concurrent_ddls.py
index 4052de682..3630db369 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -197,3 +197,26 @@ class TestConcurrentDdls(CustomClusterTestSuite):
         dump_server_stacktraces()
         assert False, "INVALIDATE METADATA timeout in 60s!"
     pool.terminate()
+
+  @CustomClusterTestSuite.with_args(
+    catalogd_args="--enable_incremental_metadata_updates=true")
+  def test_concurrent_invalidate_metadata_with_refresh(self, unique_database):
+    # Create a wide table with some partitions
+    tbl = unique_database + ".wide_tbl"
+    create_stmt = "create table {} (".format(tbl)
+    for i in range(600):
+      create_stmt += "col{} int, ".format(i)
+    create_stmt += "col600 int) partitioned by (p int) stored as textfile"
+    self.execute_query(create_stmt)
+    for i in range(10):
+      self.execute_query("alter table {} add partition (p={})".format(tbl, i))
+
+    refresh_stmt = "refresh " + tbl
+    refresh_handle = self.client.execute_async(refresh_stmt)
+    for i in range(10):
+      self.execute_query("invalidate metadata " + tbl)
+      # Always keep a concurrent REFRESH statement running
+      refresh_state = self.client.get_state(refresh_handle)
+      if refresh_state == self.client.QUERY_STATES['FINISHED']\
+          or refresh_state == self.client.QUERY_STATES['EXCEPTION']:
+        refresh_handle = self.client.execute_async(refresh_stmt)

Reply via email to