This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ab92a300f IMPALA-13631: alterTableOrViewRename shouldn't hold catalog
versionLock during external RPCs
ab92a300f is described below
commit ab92a300fc9363895418690b87b4a73df7a7202d
Author: stiga-huang <[email protected]>
AuthorDate: Wed Apr 16 18:30:05 2025 +0800
IMPALA-13631: alterTableOrViewRename shouldn't hold catalog versionLock
during external RPCs
Catalog versionLock is a lock used to synchronize reads/writes of
catalogVersion. It can be used to perform atomic bulk catalog operations
since catalogVersion cannot change externally while the lock is being
held. All other catalog operations will be blocked if the current thread
holds the lock. So it shouldn't be held for a long time, especially when
the current thread is invoking external RPCs for a table.
CatalogOpExecutor.alterTable() is one place that could hold the lock for
a long time. If the ALTER operation is a RENAME, it holds the lock until
alterTableOrViewRename() finishes. HMS RPCs are invoked in this method
to perform the operation, which might take an unpredictive time. The
motivation of holding this lock is that RENAME is implemented as an DROP
+ ADD in the catalog cache. So this operation can be atomic. However,
that doesn't mean we need the lock before operating the cache in
CatalogServiceCatalog.renameTable(). We actually acquires the lock again
in this method. So no need to keep holding the lock when invoking HMS
RPCs.
This patch removes holding the lock in alterTableOrViewRename().
Tests
- Added e2e test for concurrent rename operations.
- Also added some rename operations in test_concurrent_ddls.py
Change-Id: Ie5f443b1e167d96024b717ce70ca542d7930cb4b
Reviewed-on: http://gerrit.cloudera.org:8080/22789
Tested-by: Impala Public Jenkins <[email protected]>
Reviewed-by: Riza Suminto <[email protected]>
Reviewed-by: Joe McDonnell <[email protected]>
Reviewed-by: Michael Smith <[email protected]>
---
.../apache/impala/service/CatalogOpExecutor.java | 22 +++------
tests/custom_cluster/test_concurrent_ddls.py | 4 ++
tests/metadata/test_ddl.py | 54 ++++++++++++++++++++++
3 files changed, 65 insertions(+), 15 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 4461c03f5..565ee58b2 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1213,28 +1213,21 @@ public class CatalogOpExecutor {
// version to the table being altered.
InProgressTableModification modification =
new InProgressTableModification(catalog_, tbl);
+ catalog_.getLock().writeLock().unlock();
modification.addCatalogServiceIdentifiersToTable();
final Timer.Context context
= tbl.getMetrics().getTimer(Table.ALTER_DURATION_METRIC).time();
try {
if (params.getAlter_type() == TAlterTableType.RENAME_VIEW
|| params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
- // RENAME is implemented as an ADD + DROP, so we need to execute it as
we hold
- // the catalog lock.
- try {
- alterTableOrViewRename(tbl,
-
TableName.fromThrift(params.getRename_params().getNew_table_name()),
- modification, wantMinimalResult, response, catalogTimeline);
- modification.validateInProgressModificationComplete();
- return;
- } finally {
- // release the version taken in the tryLock call above
- catalog_.getLock().writeLock().unlock();
- }
+ alterTableOrViewRename(tbl,
+
TableName.fromThrift(params.getRename_params().getNew_table_name()),
+ modification, wantMinimalResult, response, catalogTimeline);
+ modification.validateInProgressModificationComplete();
+ return;
}
String responseSummaryMsg = null;
- catalog_.getLock().writeLock().unlock();
if (tbl instanceof KuduTable && altersKuduTable(params.getAlter_type()))
{
alterKuduTable(params, response, (KuduTable) tbl, modification,
wantMinimalResult,
@@ -5687,8 +5680,7 @@ public class CatalogOpExecutor {
private void alterTableOrViewRename(Table oldTbl, TableName newTableName,
InProgressTableModification modification, boolean wantMinimalResult,
TDdlExecResponse response, EventSequence catalogTimeline) throws
ImpalaException {
- Preconditions.checkState(oldTbl.isWriteLockedByCurrentThread()
- && catalog_.getLock().isWriteLockedByCurrentThread());
+ Preconditions.checkState(oldTbl.isWriteLockedByCurrentThread());
TableName tableName = oldTbl.getTableName();
org.apache.hadoop.hive.metastore.api.Table msTbl =
oldTbl.getMetaStoreTable().deepCopy();
diff --git a/tests/custom_cluster/test_concurrent_ddls.py
b/tests/custom_cluster/test_concurrent_ddls.py
index 1eef3b76e..ef04ae0e8 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -110,6 +110,10 @@ class TestConcurrentDdls(CustomClusterTestSuite):
# Below queries could fail if running with invalidate metadata
concurrently
"alter table %s_part add partition (j=1)" % tbl_name,
"alter table %s_part add partition (j=2)" % tbl_name,
+ "alter table {0} rename to {0}_2".format(tbl_name),
+ "alter table {0}_part rename to {0}_part2".format(tbl_name),
+ "alter table {0}_2 rename to {0}".format(tbl_name),
+ "alter table {0}_part2 rename to {0}_part".format(tbl_name),
"invalidate metadata %s_part" % tbl_name,
"refresh %s" % tbl_name,
"refresh %s_part" % tbl_name,
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index b2a5b7682..63175b55e 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -22,6 +22,9 @@ import itertools
import pytest
import re
import time
+import threading
+from multiprocessing.pool import ThreadPool
+from multiprocessing import TimeoutError
from copy import deepcopy
from tests.metadata.test_ddl_base import TestDdlBase
@@ -52,6 +55,7 @@ from tests.util.filesystem_utils import (
FILESYSTEM_NAME)
from tests.common.impala_cluster import ImpalaCluster
from tests.util.filesystem_utils import FILESYSTEM_PREFIX
+from tests.util.shell_util import dump_server_stacktraces
def get_trash_path(bucket, path):
@@ -479,6 +483,56 @@ class TestDdlStatements(TestDdlBase):
self.run_test_case('QueryTest/alter-table-set-column-stats', vector,
use_db=unique_database,
multiple_impalad=self._use_multiple_impalad(vector))
+ @UniqueDatabase.parametrize(num_dbs=2)
+ def test_concurrent_alter_table_rename(self, vector, unique_database):
+ test_self = self
+
+ class ThreadLocalClient(threading.local):
+ def __init__(self):
+ self.client = test_self.create_impala_client_from_vector(vector)
+
+ pool = ThreadPool(processes=8)
+ tlc = ThreadLocalClient()
+
+ def run_rename(i):
+ if i % 2 == 0:
+ tlc.client.execute("set sync_ddl=1")
+ is_partitioned = i % 4 < 2
+ tbl_name = "{}.tbl_{}".format(unique_database, i)
+ tlc.client.execute("create table {}(i int){}".format(
+ tbl_name, "partitioned by(p int)" if is_partitioned else ""))
+ if i % 8 < 4:
+ # Rename inside the same db
+ new_tbl_name = tbl_name + "_new"
+ else:
+ # Move to another db
+ new_tbl_name = "{}2.tbl_{}".format(unique_database, i)
+ stmts = [
+ "alter table {} rename to {}".format(tbl_name, new_tbl_name),
+ "alter table {} rename to {}".format(new_tbl_name, tbl_name),
+ ]
+ # Move the table back and forth in several rounds
+ for _ in range(4):
+ for query in stmts:
+ # Run the query asynchronously to avoid getting stuck by it
+ handle = tlc.client.execute_async(query)
+ is_finished = tlc.client.wait_for_finished_timeout(handle,
timeout=60)
+ assert is_finished, "Query timeout(60s): " + query
+ tlc.client.close_query(handle)
+ return True
+
+ # Run renames in parallel
+ NUM_ITERS = 16
+ worker = [None] * (NUM_ITERS + 1)
+ for i in range(1, NUM_ITERS + 1):
+ worker[i] = pool.apply_async(run_rename, (i,))
+ for i in range(1, NUM_ITERS + 1):
+ try:
+ assert worker[i].get(timeout=100)
+ except TimeoutError:
+ dump_server_stacktraces()
+ assert False, "Timeout in thread run_ddls(%d)" % i
+
@SkipIfFS.hbase
@UniqueDatabase.parametrize(sync_ddl=True)
def test_alter_hbase_set_column_stats(self, vector, unique_database):