Repository: impala Updated Branches: refs/heads/master 5cc49c343 -> 8c93a4568
IMPALA-7717: Handle concurrent partition changes in local catalog mode Current code throws a RuntimeException (RTE) when partial fetch RPCs looking up partition metadata and the corresponding partition ID is missing on the Catalog server. There are a couple of cases here. 1. The partition could be genuinely missing as it was dropped by a concurrent operation. 2. Partial fetch RPCs lookup partitions by IDs instead of names. This is problematic since the IDs can change over the lifetime of a table. In both the cases, throwing a RTE is not the right approach and for (2) we need to transparently retry the fetch with the new partition ID. We eventually need to fix (2) as looking up by partition ID is not the right approach. Testing: Updated an e-e test which fails without the patch. Change-Id: I2aa103ee159ce9478af9b5b27b36bc0cc286f442 Reviewed-on: http://gerrit.cloudera.org:8080/11732 Reviewed-by: Bharath Vissapragada <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/8c93a456 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/8c93a456 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/8c93a456 Branch: refs/heads/master Commit: 8c93a456891587c1add30a08fa8ab395208e0cf1 Parents: 5cc49c3 Author: Bharath Vissapragada <[email protected]> Authored: Thu Oct 18 16:09:31 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Oct 19 09:22:05 2018 +0000 ---------------------------------------------------------------------- common/thrift/CatalogService.thrift | 7 ++- .../org/apache/impala/catalog/HdfsTable.java | 9 +++- .../catalog/local/CatalogdMetaProvider.java | 1 + .../impala/catalog/PartialCatalogInfoTest.java | 9 ++-- tests/custom_cluster/test_local_catalog.py | 55 +++----------------- 5 files changed, 25 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/8c93a456/common/thrift/CatalogService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index c0792b3..6237c1f 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -379,7 +379,12 @@ enum CatalogLookupStatus { DB_NOT_FOUND, TABLE_NOT_FOUND, TABLE_NOT_LOADED, - FUNCTION_NOT_FOUND + FUNCTION_NOT_FOUND, + // Partial fetch RPCs currently look up partitions by IDs instead of names. These IDs + // change over the lifetime of a table with queries like invalidate metadata. In such + // cases this lookup status is set and the caller can retry the fetch. + // TODO: Fix partition lookup logic to not do it with IDs. + PARTITION_NOT_FOUND } // RPC response for GetPartialCatalogObject. http://git-wip-us.apache.org/repos/asf/impala/blob/8c93a456/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 1bca8ec..4eba255 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -63,6 +63,7 @@ import org.apache.impala.common.Reference; import org.apache.impala.compat.HdfsShim; import org.apache.impala.fb.FbFileBlock; import org.apache.impala.service.BackendConfig; +import org.apache.impala.thrift.CatalogLookupStatus; import org.apache.impala.thrift.CatalogObjectsConstants; import org.apache.impala.thrift.TAccessLevel; import org.apache.impala.thrift.TCatalogObjectType; @@ -1709,8 +1710,12 @@ public class HdfsTable extends Table implements FeFsTable { resp.table_info.partitions = Lists.newArrayListWithCapacity(partIds.size()); for (long partId : partIds) { HdfsPartition part = partitionMap_.get(partId); - Preconditions.checkArgument(part != null, "Partition id %s does not exist", - partId); + if (part == null) { + LOG.warn(String.format("Missing partition ID: %s, Table: %s", partId, + getFullName())); + return new TGetPartialCatalogObjectResponse().setLookup_status( + CatalogLookupStatus.PARTITION_NOT_FOUND); + } TPartialPartitionInfo partInfo = new TPartialPartitionInfo(partId); if (req.table_info_selector.want_partition_names) { http://git-wip-us.apache.org/repos/asf/impala/blob/8c93a456/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index e099b53..370e4b2 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -345,6 +345,7 @@ public class CatalogdMetaProvider implements MetaProvider { case FUNCTION_NOT_FOUND: case TABLE_NOT_FOUND: case TABLE_NOT_LOADED: + case PARTITION_NOT_FOUND: invalidateCacheForObject(req.object_desc); throw new InconsistentMetadataFetchException( String.format("Fetching %s failed. Could not find %s", http://git-wip-us.apache.org/repos/asf/impala/blob/8c93a456/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java index 63e0fda..92fff65 100644 --- a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.impala.common.InternalException; import org.apache.impala.service.BackendConfig; import org.apache.impala.testutil.CatalogServiceTestCatalog; +import org.apache.impala.thrift.CatalogLookupStatus; import org.apache.impala.thrift.TCatalogInfoSelector; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; @@ -175,12 +176,8 @@ public class PartialCatalogInfoTest { req.table_info_selector = new TTableInfoSelector(); req.table_info_selector.want_partition_metadata = true; req.table_info_selector.partition_ids = ImmutableList.of(-12345L); // non-existent - try { - sendRequest(req); - fail("did not throw exception for missing partition"); - } catch (IllegalArgumentException iae) { - assertEquals("Partition id -12345 does not exist", iae.getMessage()); - } + TGetPartialCatalogObjectResponse resp = sendRequest(req); + assertEquals(resp.lookup_status, CatalogLookupStatus.PARTITION_NOT_FOUND); } @Test http://git-wip-us.apache.org/repos/asf/impala/blob/8c93a456/tests/custom_cluster/test_local_catalog.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py index 3815f34..5445217 100644 --- a/tests/custom_cluster/test_local_catalog.py +++ b/tests/custom_cluster/test_local_catalog.py @@ -340,63 +340,24 @@ class TestLocalCatalogRetries(CustomClusterTestSuite): replans_seen = [0] replans_seen_lock = threading.Lock() + # Queue to propagate exceptions from failed queries, if any. + failed_queries = Queue.Queue() + def stress_thread(client): while replans_seen[0] == 0: # TODO(todd) EXPLAIN queries don't currently yield a profile, so # we have to actually run a COUNT query. q = random.choice([ - 'refresh functional.alltypes', + 'invalidate metadata functional.alltypes', 'select count(*) from functional.alltypes where month=4', 'select count(*) from functional.alltypes where month=5']) - ret = self.execute_query_expect_success(client, q) - if RETRY_PROFILE_MSG in ret.runtime_profile: - with replans_seen_lock: - replans_seen[0] += 1 - - threads = [threading.Thread(target=stress_thread, args=(c,)) - for c in [client1, client2]] - for t in threads: - t.start() - for t in threads: - t.join(30) - assert replans_seen[0] > 0, "Did not trigger any re-plans" - - finally: - client1.close() - client2.close() - - @pytest.mark.execute_serially - @CustomClusterTestSuite.with_args( - impalad_args="--use_local_catalog=true", - catalogd_args="--catalog_topic_mode=minimal") - def test_concurrent_invalidate_with_queries(self, unique_database): - """ - Tests that the queries are replanned when they clash with concurrent invalidates. - """ - # TODO: Merge this with the above test after fixing IMPALA-7717 - try: - impalad1 = self.cluster.impalads[0] - impalad2 = self.cluster.impalads[1] - client1 = impalad1.service.create_beeswax_client() - client2 = impalad2.service.create_beeswax_client() - # Track the number of replans. - replans_seen = [0] - replans_seen_lock = threading.Lock() - - # Queue to propagate exceptions from failed queries, if any. - failed_queries = Queue.Queue() - - def stress_thread(client): - while replans_seen[0] == 0: - q = random.choice([ - 'invalidate metadata functional.alltypesnopart', - 'select count(*) from functional.alltypesnopart', - 'select count(*) from functional.alltypesnopart']) try: ret = self.execute_query_expect_success(client, q) except Exception as e: failed_queries.put((q, str(e))) + continue + if RETRY_PROFILE_MSG in ret.runtime_profile: with replans_seen_lock: replans_seen[0] += 1 @@ -407,8 +368,8 @@ class TestLocalCatalogRetries(CustomClusterTestSuite): t.start() for t in threads: t.join(30) - assert failed_queries.empty(),\ - "Failed query count non zero: %s" % list(failed_queries.queue) + assert failed_queries.empty(), "Failed queries encountered: %s" %\ + list(failed_queries.queue) assert replans_seen[0] > 0, "Did not trigger any re-plans" finally:
