This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 70b7b6a78 IMPALA-13134: DDL hang with SYNC_DDL enabled when Catalogd
is changed to standby status
70b7b6a78 is described below
commit 70b7b6a78d49c30933d79e0a1c2a725f7e0a3e50
Author: wzhou-code <[email protected]>
AuthorDate: Tue Jun 4 21:25:57 2024 -0700
IMPALA-13134: DDL hang with SYNC_DDL enabled when Catalogd is changed to
standby status
Catalogd waits for SYNC_DDL version when it processes a DDL with
SYNC_DDL enabled. If the status of Catalogd is changed from active to
standby when CatalogServiceCatalog.waitForSyncDdlVersion() is called,
the standby catalogd does not receive catalog topic updates from
statestore, hence catalogd thread waits indefinitely.
This patch fixed the issue by re-generating service id when Catalogd
is changed to standby status and throwing exception if its service id
has been changed when waiting for SYNC_DDL version.
Testing:
- Added unit-test code for CatalogD HA to run DDL with SYNC_DDL enabled
and injected delay when waiting SYNC_DLL version, then verify that
DDL query fails due to catalog failover.
- Passed test_catalogd_ha.py.
Change-Id: I2dcd628cff3c10d2e7566ba2d9de0b5886a18fc1
Reviewed-on: http://gerrit.cloudera.org:8080/21480
Reviewed-by: Riza Suminto <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/catalog/catalog-server.cc | 2 ++
be/src/util/backend-gflag-util.cc | 2 ++
common/thrift/BackendGflags.thrift | 2 ++
.../impala/catalog/CatalogServiceCatalog.java | 22 +++++++++++++
.../org/apache/impala/service/BackendConfig.java | 4 +++
.../java/org/apache/impala/util/DebugUtils.java | 4 +++
tests/custom_cluster/test_catalogd_ha.py | 37 ++++++++++++++++++++++
7 files changed, 73 insertions(+)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 5f318278a..d9d8cd0e6 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -713,6 +713,8 @@ void CatalogServer::UpdateActiveCatalogd(bool
is_registration_reply,
<< TNetworkAddressToString(catalogd_registration.address)
<< ", active_catalogd_version: "
<< active_catalogd_version;
+ // Regenerate Catalog Service ID.
+ catalog_->RegenerateServiceId();
}
}
}
diff --git a/be/src/util/backend-gflag-util.cc
b/be/src/util/backend-gflag-util.cc
index 48bb739dc..d7cf69125 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -128,6 +128,7 @@ DECLARE_string(common_hms_event_types);
DECLARE_int32(dbcp_max_conn_pool_size);
DECLARE_int32(dbcp_max_wait_millis_for_conn);
DECLARE_int32(dbcp_data_source_idle_timeout_s);
+DECLARE_bool(enable_catalogd_ha);
// HS2 SAML2.0 configuration
// Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -491,6 +492,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
#else
cfg.__set_is_release_build(false);
#endif
+ cfg.__set_enable_catalogd_ha(FLAGS_enable_catalogd_ha);
return Status::OK();
}
diff --git a/common/thrift/BackendGflags.thrift
b/common/thrift/BackendGflags.thrift
index 122d6ea5e..234d0f231 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -306,4 +306,6 @@ struct TBackendGflags {
137: required i32 dbcp_data_source_idle_timeout
138: required bool is_release_build
+
+ 139: required bool enable_catalogd_ha
}
diff --git
a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index b58970ea7..666eb8b4a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -132,9 +132,11 @@ import org.apache.impala.thrift.TTableName;
import org.apache.impala.thrift.TTableType;
import org.apache.impala.thrift.TTableUsage;
import org.apache.impala.thrift.TTableUsageMetrics;
+import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TUpdateTableUsageRequest;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.CatalogBlacklistUtils;
+import org.apache.impala.util.DebugUtils;
import org.apache.impala.util.EventSequence;
import org.apache.impala.util.FunctionUtils;
import org.apache.impala.util.NoOpEventSequence;
@@ -152,6 +154,7 @@ import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -3499,10 +3502,29 @@ public class CatalogServiceCatalog extends Catalog {
long numAttempts = 0;
long begin = System.currentTimeMillis();
long versionToWaitFor = -1;
+ TUniqueId serviceId = JniCatalog.getServiceId();
while (versionToWaitFor == -1) {
if (LOG.isTraceEnabled()) {
LOG.trace("waitForSyncDdlVersion() attempt: " + numAttempts);
}
+ if (BackendConfig.INSTANCE.isCatalogdHAEnabled()) {
+ // Catalog serviceId is changed when the HA role of the catalog
instance is
+ // changed from active to standby, or from standby to active. Inactive
catalogd
+ // does not receive catalog topic updates from the statestore. To
avoid waiting
+ // indefinitely, throw exception if its service id has been changed.
+ if (!Strings.isNullOrEmpty(BackendConfig.INSTANCE.debugActions())) {
+ DebugUtils.executeDebugAction(
+ BackendConfig.INSTANCE.debugActions(),
DebugUtils.WAIT_SYNC_DDL_VER_DELAY);
+ }
+ if (!serviceId.equals(JniCatalog.getServiceId())) {
+ String errorMsg = "Couldn't retrieve the catalog topic update for
the " +
+ "SYNC_DDL operation since HA role of this catalog instance has
been " +
+ "changed. The operation has been successfully executed but its
effects " +
+ "may have not been broadcast to all the coordinators.";
+ LOG.error(errorMsg);
+ throw new CatalogException(errorMsg);
+ }
+ }
// Examine the topic update log to determine the latest topic update that
// covers the added/modified/deleted objects in 'result'.
long topicVersionForUpdates =
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 6325176bc..326ee5ebf 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -525,4 +525,8 @@ public class BackendConfig {
public boolean isReleaseBuild() {
return backendCfg_.is_release_build;
}
+
+ public boolean isCatalogdHAEnabled() {
+ return backendCfg_.enable_catalogd_ha;
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
index 67b05e4e4..141faf80e 100644
--- a/fe/src/main/java/org/apache/impala/util/DebugUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/DebugUtils.java
@@ -86,6 +86,10 @@ public class DebugUtils {
// debug action to enable eventProcessor
public static final String ENABLE_EVENT_PROCESSOR = "enable_event_processor";
+ // debug action label to inject a delay when waiting SYNC DDL version
+ public static final String WAIT_SYNC_DDL_VER_DELAY =
+ "catalogd_wait_sync_ddl_version_delay";
+
/**
* Returns true if the label of action is set in the debugActions
*/
diff --git a/tests/custom_cluster/test_catalogd_ha.py
b/tests/custom_cluster/test_catalogd_ha.py
index c2b5cdb0f..f8ebff6b9 100644
--- a/tests/custom_cluster/test_catalogd_ha.py
+++ b/tests/custom_cluster/test_catalogd_ha.py
@@ -21,6 +21,7 @@ import logging
import re
import requests
+from beeswaxd.BeeswaxService import QueryState
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.environ import build_flavor_timeout
from tests.util.filesystem_utils import get_fs_path
@@ -435,6 +436,42 @@ class TestCatalogdHA(CustomClusterTestSuite):
# Verify simple queries are ran successfully.
self.__run_simple_queries()
+ @CustomClusterTestSuite.with_args(
+ statestored_args="--use_subscriber_id_as_catalogd_priority=true",
+
catalogd_args="--debug_actions='catalogd_wait_sync_ddl_version_delay:SLEEP@5000'",
+ start_args="--enable_catalogd_ha")
+ def test_catalogd_failover_with_sync_ddl(self, unique_database):
+ """Tests for Catalog Service force fail-over when running DDL with SYNC_DDL
+ enabled."""
+ # Verify two catalogd instances are created with one as active.
+ catalogds = self.cluster.catalogds()
+ assert(len(catalogds) == 2)
+ catalogd_service_1 = catalogds[0].service
+ catalogd_service_2 = catalogds[1].service
+ assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
+ assert(not
catalogd_service_2.get_metric_value("catalog-server.active-status"))
+
+ # Run DDL with SYNC_DDL enabled.
+ client = self.cluster.impalads[0].service.create_beeswax_client()
+ assert client is not None
+ self.execute_query_expect_success(client, "set SYNC_DDL=1")
+ ddl_query = "CREATE TABLE {database}.failover_sync_ddl (c int)"
+ handle = client.execute_async(ddl_query.format(database=unique_database))
+
+ # Restart standby catalogd with force_catalogd_active as true.
+ catalogds[1].kill()
+ catalogds[1].start(wait_until_ready=True,
+ additional_args="--force_catalogd_active=true")
+ # Wait until original active catalogd becomes in-active.
+ catalogd_service_1 = catalogds[0].service
+ catalogd_service_1.wait_for_metric_value(
+ "catalog-server.active-status", expected_value=False, timeout=15)
+ assert(not
catalogd_service_1.get_metric_value("catalog-server.active-status"))
+
+ # Verify that the query is failed due to the Catalogd HA fail-over.
+ self.wait_for_state(handle, QueryState.EXCEPTION, 30, client=client)
+ client.close()
+
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=true",