This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new ef8f8ca27 IMPALA-13631: (Addendum) Retry aborted concurrent DDLs
ef8f8ca27 is described below
commit ef8f8ca27b52f7fd842a7a887d5c9a8db9831f79
Author: Michael Smith <[email protected]>
AuthorDate: Tue Apr 22 13:32:51 2025 -0700
IMPALA-13631: (Addendum) Retry aborted concurrent DDLs
TestConcurrentDdls has several exceptions it considers acceptable for
testing; it would accept the query failure and continue with other
cases. That was fine for existing queries, but if an ALTER RENAME fails
subsequent queries will also fail because the table does not have the
expected name.
With IMPALA-13631, there are three exception cases we need to handle:
1. "Table/view rename succeeded in the Hive Metastore, but failed in
Impala's Catalog Server" happens when the HMS alter_table RPC
succeeds but local catalog has changed. INVALIDATE METADATA on the
target table is sufficient to bring things in sync.
2. "CatalogException: Table ... was modified while operation was in
progress, aborting execution" can safely be retried.
3. "Couldn't retrieve the catalog topic update for the SYNC_DDL
operation" happens when SYNC_DDL=1 and the DDL runs on a stale table
object that's removed from the cache by a global INVALIDATE.
Adds --max_wait_time_for_sync_ddl_s=10 in catalogd_args for the last
exception to occur. Otherwise the query will just timeout.
Tested by running test_concurrent_ddls.py 15 times. The 1st exception
previously would show up within 3-4 runs, while the 2nd exception
happens pretty much every run.
Change-Id: I04d071b62e4f306466a69ebd9e134a37d4327b77
Reviewed-on: http://gerrit.cloudera.org:8080/22802
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Michael Smith <[email protected]>
---
tests/custom_cluster/test_concurrent_ddls.py | 67 +++++++++++++++++++++-------
1 file changed, 50 insertions(+), 17 deletions(-)
diff --git a/tests/custom_cluster/test_concurrent_ddls.py
b/tests/custom_cluster/test_concurrent_ddls.py
index ef04ae0e8..aa19d2018 100644
--- a/tests/custom_cluster/test_concurrent_ddls.py
+++ b/tests/custom_cluster/test_concurrent_ddls.py
@@ -18,7 +18,9 @@
from __future__ import absolute_import, division, print_function
from builtins import range
import pytest
+import re
import threading
+import time
from multiprocessing.pool import ThreadPool
from multiprocessing import TimeoutError
@@ -30,7 +32,9 @@ from tests.util.shell_util import dump_server_stacktraces
class TestConcurrentDdls(CustomClusterTestSuite):
- """Test concurrent DDLs with invalidate metadata"""
+ """Test concurrent DDLs with invalidate metadata
+ TODO: optimize the time dropping the unique_database at the end which
dominants
+ test time. It currently takes >1m. Most of the time spent in HMS."""
def _make_per_impalad_args(local_catalog_enabled):
assert isinstance(local_catalog_enabled, list)
@@ -48,7 +52,7 @@ class TestConcurrentDdls(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=false",
- catalogd_args="--catalog_topic_mode=full")
+ catalogd_args="--catalog_topic_mode=full
--max_wait_time_for_sync_ddl_s=10")
def test_ddls_with_invalidate_metadata_sync_ddl(self, unique_database):
self._run_ddls_with_invalidation(unique_database, sync_ddl=True)
@@ -62,7 +66,7 @@ class TestConcurrentDdls(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
start_args=_make_per_impalad_args([True, False]),
- catalogd_args="--catalog_topic_mode=mixed")
+ catalogd_args="--catalog_topic_mode=mixed
--max_wait_time_for_sync_ddl_s=10")
def test_mixed_catalog_ddls_with_invalidate_metadata_sync_ddl(self,
unique_database):
self._run_ddls_with_invalidation(unique_database, sync_ddl=True)
@@ -76,7 +80,7 @@ class TestConcurrentDdls(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
- catalogd_args="--catalog_topic_mode=minimal")
+ catalogd_args="--catalog_topic_mode=minimal
--max_wait_time_for_sync_ddl_s=10")
def test_local_catalog_ddls_with_invalidate_metadata_sync_ddl(self,
unique_database):
self._run_ddls_with_invalidation(unique_database, sync_ddl=True)
@@ -94,6 +98,9 @@ class TestConcurrentDdls(CustomClusterTestSuite):
tls = ThreadLocalClient()
def run_ddls(i):
+ # Add a sleep so global INVALIDATE has more chance to run concurrently
with other
+ # DDLs.
+ time.sleep(i % 5)
tbl_name = db + ".test_" + str(i)
# func_name = "f_" + str(i)
for query in [
@@ -124,14 +131,25 @@ class TestConcurrentDdls(CustomClusterTestSuite):
"insert overwrite table %s_part partition(j=2) "
"values (1), (2), (3), (4), (5)" % tbl_name
]:
- try:
- handle = tls.client.execute_async(query)
- is_finished = tls.client.wait_for_finished_timeout(handle,
timeout=60)
- assert is_finished, "Query timeout(60s): " + query
- tls.client.close_query(handle)
- except IMPALA_CONNECTION_EXCEPTION as e:
- # Could raise exception when running with INVALIDATE METADATA
- assert TestConcurrentDdls.is_acceptable_error(str(e), sync_ddl),
str(e)
+ # Running concurrent with INVALIDATE METADATA can raise an exception.
These are
+ # safe to retry, so do that until we get a success.
+ while True:
+ try:
+ handle = tls.client.execute_async(query)
+ is_finished = tls.client.wait_for_finished_timeout(handle,
timeout=60)
+ assert is_finished, "Query timeout(60s): " + query
+ tls.client.close_query(handle)
+ # Success, next case.
+ break
+ except IMPALA_CONNECTION_EXCEPTION as e:
+ err = str(e)
+ if self.handle_rename_failure(tls.client, tbl_name, err):
+ # Table was successfully renamed, next case.
+ break
+ elif self.is_transient_error(err):
+ # Retry the query.
+ continue
+ assert self.is_acceptable_error(err, sync_ddl), err
self.execute_query_expect_success(tls.client, "invalidate metadata")
return True
@@ -157,18 +175,33 @@ class TestConcurrentDdls(CustomClusterTestSuite):
assert False, "Timeout in thread run_ddls(%d)" % i
@classmethod
- def is_acceptable_error(cls, err, sync_ddl):
+ def is_transient_error(cls, err):
# DDL/DMLs may fail if running with invalidate metadata concurrently,
since in-flight
# table loadings can't finish if the target table is changed (e.g. reset
to unloaded
# state). See more in CatalogOpExecutor.getExistingTable().
if "CatalogException: Table" in err and \
"was modified while operation was in progress, aborting execution" in
err:
return True
+ return False
+
+ @classmethod
+ def is_acceptable_error(cls, err, sync_ddl):
# TODO: Consider remove this case after IMPALA-9135 is fixed.
- if sync_ddl and "Couldn't retrieve the catalog topic version for the
SYNC_DDL " \
- "operation after 5 attempts.The operation has been
successfully " \
- "executed but its effects may have not been broadcast to
all the " \
- "coordinators." in err:
+ if sync_ddl:
+ if "Couldn't retrieve the catalog topic version for the SYNC_DDL
operation" in err\
+ and ("The operation has been successfully executed but its effects may
have not "
+ "been broadcast to all the coordinators.") in err:
+ return True
+ return False
+
+ def handle_rename_failure(self, client, tbl_name, err):
+ if "Table/view rename succeeded in the Hive Metastore, " \
+ "but failed in Impala's Catalog Server." in err:
+ # Invalidate the target table so we reload it from HMS.
+ tbl_names = re.findall(r"{}[^']*".format(tbl_name), err)
+ assert len(tbl_names) == 2
+ self.execute_query_expect_success(
+ client, "invalidate metadata {0}".format(tbl_names[1]))
return True
return False