This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ab92a300f IMPALA-13631: alterTableOrViewRename shouldn't hold catalog 
versionLock during external RPCs
ab92a300f is described below

commit ab92a300fc9363895418690b87b4a73df7a7202d
Author: stiga-huang <[email protected]>
AuthorDate: Wed Apr 16 18:30:05 2025 +0800

    IMPALA-13631: alterTableOrViewRename shouldn't hold catalog versionLock 
during external RPCs
    
    Catalog versionLock is a lock used to synchronize reads/writes of
    catalogVersion. It can be used to perform atomic bulk catalog operations
    since catalogVersion cannot change externally while the lock is being
    held. All other catalog operations will be blocked if the current thread
    holds the lock. So it shouldn't be held for a long time, especially when
    the current thread is invoking external RPCs for a table.
    
    CatalogOpExecutor.alterTable() is one place that could hold the lock for
    a long time. If the ALTER operation is a RENAME, it holds the lock until
    alterTableOrViewRename() finishes. HMS RPCs are invoked in this method
    to perform the operation, which might take an unpredictive time. The
    motivation of holding this lock is that RENAME is implemented as an DROP
    + ADD in the catalog cache. So this operation can be atomic. However,
    that doesn't mean we need the lock before operating the cache in
    CatalogServiceCatalog.renameTable(). We actually acquires the lock again
    in this method. So no need to keep holding the lock when invoking HMS
    RPCs.
    
    This patch removes holding the lock in alterTableOrViewRename().
    
    Tests
     - Added e2e test for concurrent rename operations.
     - Also added some rename operations in test_concurrent_ddls.py
    
    Change-Id: Ie5f443b1e167d96024b717ce70ca542d7930cb4b
    Reviewed-on: http://gerrit.cloudera.org:8080/22789
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Riza Suminto <[email protected]>
    Reviewed-by: Joe McDonnell <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
---
 .../apache/impala/service/CatalogOpExecutor.java   | 22 +++------
 tests/custom_cluster/test_concurrent_ddls.py       |  4 ++
 tests/metadata/test_ddl.py                         | 54 ++++++++++++++++++++++
 3 files changed, 65 insertions(+), 15 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 4461c03f5..565ee58b2 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1213,28 +1213,21 @@ public class CatalogOpExecutor {
     // version to the table being altered.
     InProgressTableModification modification =
         new InProgressTableModification(catalog_, tbl);
+    catalog_.getLock().writeLock().unlock();
     modification.addCatalogServiceIdentifiersToTable();
     final Timer.Context context
         = tbl.getMetrics().getTimer(Table.ALTER_DURATION_METRIC).time();
     try {
       if (params.getAlter_type() == TAlterTableType.RENAME_VIEW
           || params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
-        // RENAME is implemented as an ADD + DROP, so we need to execute it as 
we hold
-        // the catalog lock.
-        try {
-          alterTableOrViewRename(tbl,
-              
TableName.fromThrift(params.getRename_params().getNew_table_name()),
-              modification, wantMinimalResult, response, catalogTimeline);
-          modification.validateInProgressModificationComplete();
-          return;
-        } finally {
-          // release the version taken in the tryLock call above
-          catalog_.getLock().writeLock().unlock();
-        }
+        alterTableOrViewRename(tbl,
+            
TableName.fromThrift(params.getRename_params().getNew_table_name()),
+            modification, wantMinimalResult, response, catalogTimeline);
+        modification.validateInProgressModificationComplete();
+        return;
       }
 
       String responseSummaryMsg = null;
-      catalog_.getLock().writeLock().unlock();
 
       if (tbl instanceof KuduTable && altersKuduTable(params.getAlter_type())) 
{
         alterKuduTable(params, response, (KuduTable) tbl, modification, 
wantMinimalResult,
@@ -5687,8 +5680,7 @@ public class CatalogOpExecutor {
   private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
       InProgressTableModification modification, boolean wantMinimalResult,
       TDdlExecResponse response, EventSequence catalogTimeline) throws 
ImpalaException {
-    Preconditions.checkState(oldTbl.isWriteLockedByCurrentThread()
-        && catalog_.getLock().isWriteLockedByCurrentThread());
+    Preconditions.checkState(oldTbl.isWriteLockedByCurrentThread());
     TableName tableName = oldTbl.getTableName();
     org.apache.hadoop.hive.metastore.api.Table msTbl =
         oldTbl.getMetaStoreTable().deepCopy();
diff --git a/tests/custom_cluster/test_concurrent_ddls.py 
b/tests/custom_cluster/test_concurrent_ddls.py
index 1eef3b76e..ef04ae0e8 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -110,6 +110,10 @@ class TestConcurrentDdls(CustomClusterTestSuite):
         # Below queries could fail if running with invalidate metadata 
concurrently
         "alter table %s_part add partition (j=1)" % tbl_name,
         "alter table %s_part add partition (j=2)" % tbl_name,
+        "alter table {0} rename to {0}_2".format(tbl_name),
+        "alter table {0}_part rename to {0}_part2".format(tbl_name),
+        "alter table {0}_2 rename to {0}".format(tbl_name),
+        "alter table {0}_part2 rename to {0}_part".format(tbl_name),
         "invalidate metadata %s_part" % tbl_name,
         "refresh %s" % tbl_name,
         "refresh %s_part" % tbl_name,
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index b2a5b7682..63175b55e 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -22,6 +22,9 @@ import itertools
 import pytest
 import re
 import time
+import threading
+from multiprocessing.pool import ThreadPool
+from multiprocessing import TimeoutError
 
 from copy import deepcopy
 from tests.metadata.test_ddl_base import TestDdlBase
@@ -52,6 +55,7 @@ from tests.util.filesystem_utils import (
     FILESYSTEM_NAME)
 from tests.common.impala_cluster import ImpalaCluster
 from tests.util.filesystem_utils import FILESYSTEM_PREFIX
+from tests.util.shell_util import dump_server_stacktraces
 
 
 def get_trash_path(bucket, path):
@@ -479,6 +483,56 @@ class TestDdlStatements(TestDdlBase):
     self.run_test_case('QueryTest/alter-table-set-column-stats', vector,
         use_db=unique_database, 
multiple_impalad=self._use_multiple_impalad(vector))
 
+  @UniqueDatabase.parametrize(num_dbs=2)
+  def test_concurrent_alter_table_rename(self, vector, unique_database):
+    test_self = self
+
+    class ThreadLocalClient(threading.local):
+      def __init__(self):
+        self.client = test_self.create_impala_client_from_vector(vector)
+
+    pool = ThreadPool(processes=8)
+    tlc = ThreadLocalClient()
+
+    def run_rename(i):
+      if i % 2 == 0:
+        tlc.client.execute("set sync_ddl=1")
+      is_partitioned = i % 4 < 2
+      tbl_name = "{}.tbl_{}".format(unique_database, i)
+      tlc.client.execute("create table {}(i int){}".format(
+          tbl_name, "partitioned by(p int)" if is_partitioned else ""))
+      if i % 8 < 4:
+        # Rename inside the same db
+        new_tbl_name = tbl_name + "_new"
+      else:
+        # Move to another db
+        new_tbl_name = "{}2.tbl_{}".format(unique_database, i)
+      stmts = [
+        "alter table {} rename to {}".format(tbl_name, new_tbl_name),
+        "alter table {} rename to {}".format(new_tbl_name, tbl_name),
+      ]
+      # Move the table back and forth in several rounds
+      for _ in range(4):
+        for query in stmts:
+          # Run the query asynchronously to avoid getting stuck by it
+          handle = tlc.client.execute_async(query)
+          is_finished = tlc.client.wait_for_finished_timeout(handle, 
timeout=60)
+          assert is_finished, "Query timeout(60s): " + query
+          tlc.client.close_query(handle)
+      return True
+
+    # Run renames in parallel
+    NUM_ITERS = 16
+    worker = [None] * (NUM_ITERS + 1)
+    for i in range(1, NUM_ITERS + 1):
+      worker[i] = pool.apply_async(run_rename, (i,))
+    for i in range(1, NUM_ITERS + 1):
+      try:
+        assert worker[i].get(timeout=100)
+      except TimeoutError:
+        dump_server_stacktraces()
+        assert False, "Timeout in thread run_ddls(%d)" % i
+
   @SkipIfFS.hbase
   @UniqueDatabase.parametrize(sync_ddl=True)
   def test_alter_hbase_set_column_stats(self, vector, unique_database):

Reply via email to