Repository: incubator-impala Updated Branches: refs/heads/master d9602df71 -> e1b092de6
IMPALA-4762: RECOVER PARTITIONS should batch partition updates Batch updates when doing a RECOVER PARTITIONS on over 500 partitions at a time to avoid HMS timeouts, possible OOM. Testing: Expanded test coverage with a new python test for this case. Test takes ~18s to run. Change-Id: I7f9334051b11ba8fa16159b7ca67ddc7f2392733 Reviewed-on: http://gerrit.cloudera.org:8080/6275 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/eec8d6fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/eec8d6fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/eec8d6fd Branch: refs/heads/master Commit: eec8d6fd15dc9e914a773aab1390f95d97f515ae Parents: d9602df Author: Zach Amsden <[email protected]> Authored: Mon Mar 6 23:43:45 2017 +0000 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Mar 14 01:38:59 2017 +0000 ---------------------------------------------------------------------- .../impala/service/CatalogOpExecutor.java | 52 ++++++++++---------- tests/metadata/test_recover_partitions.py | 31 ++++++++++++ 2 files changed, 58 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eec8d6fd/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 9d82c07..5011769 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -2633,31 +2633,33 @@ public class CatalogOpExecutor { // Add partitions to metastore. try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { - // ifNotExists and needResults are true. - hmsPartitions = msClient.getHiveClient().add_partitions(hmsPartitions, - true, true); - for (Partition partition: hmsPartitions) { - // Create and add the HdfsPartition. Return the table object with an updated - // catalog version. - addHdfsPartition(tbl, partition); - } + // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'. + for (int i = 0; i < hmsPartitions.size(); i += MAX_PARTITION_UPDATES_PER_RPC) { + int endPartitionIndex = + Math.min(i + MAX_PARTITION_UPDATES_PER_RPC, hmsPartitions.size()); + List<Partition> hmsSublist = hmsPartitions.subList(i, endPartitionIndex); + // ifNotExists and needResults are true. + List<Partition> hmsAddedPartitions = + msClient.getHiveClient().add_partitions(hmsSublist, true, true); + for (Partition partition: hmsAddedPartitions) { + // Create and add the HdfsPartition. Return the table object with an updated + // catalog version. + addHdfsPartition(tbl, partition); + } - // Handle HDFS cache. - if (cachePoolName != null) { - for (Partition partition: hmsPartitions) { - long id = HdfsCachingUtil.submitCachePartitionDirective(partition, - cachePoolName, replication); - cacheIds.add(id); + // Handle HDFS cache. + if (cachePoolName != null) { + for (Partition partition: hmsAddedPartitions) { + long id = HdfsCachingUtil.submitCachePartitionDirective(partition, + cachePoolName, replication); + cacheIds.add(id); + } + // Update the partition metadata to include the cache directive id. + msClient.getHiveClient().alter_partitions(tableName.getDb(), + tableName.getTbl(), hmsAddedPartitions); } - // Update the partition metadata to include the cache directive id. - msClient.getHiveClient().alter_partitions(tableName.getDb(), - tableName.getTbl(), hmsPartitions); + updateLastDdlTime(msTbl, msClient); } - updateLastDdlTime(msTbl, msClient); - } catch (AlreadyExistsException e) { - // This may happen when another client of HMS has added the partitions. - LOG.trace(String.format("Ignoring '%s' when adding partition to %s.", e, - tableName)); } catch (TException e) { throw new ImpalaRuntimeException( String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partition"), e); @@ -2956,15 +2958,15 @@ public class CatalogOpExecutor { try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'. for (int i = 0; i < hmsPartitions.size(); i += MAX_PARTITION_UPDATES_PER_RPC) { - int numPartitionsToUpdate = + int endPartitionIndex = Math.min(i + MAX_PARTITION_UPDATES_PER_RPC, hmsPartitions.size()); try { // Alter partitions in bulk. msClient.getHiveClient().alter_partitions(dbName, tableName, - hmsPartitions.subList(i, numPartitionsToUpdate)); + hmsPartitions.subList(i, endPartitionIndex)); // Mark the corresponding HdfsPartition objects as dirty for (org.apache.hadoop.hive.metastore.api.Partition msPartition: - hmsPartitions.subList(i, numPartitionsToUpdate)) { + hmsPartitions.subList(i, endPartitionIndex)) { try { catalog_.getHdfsPartition(dbName, tableName, msPartition).markDirty(); } catch (PartitionNotFoundException e) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/eec8d6fd/tests/metadata/test_recover_partitions.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_recover_partitions.py b/tests/metadata/test_recover_partitions.py index cb0cf98..58a7219 100644 --- a/tests/metadata/test_recover_partitions.py +++ b/tests/metadata/test_recover_partitions.py @@ -168,6 +168,37 @@ class TestRecoverPartitions(ImpalaTestSuite): assert self.has_value(INSERTED_VALUE, result.data) @SkipIfLocal.hdfs_client + def test_recover_many_partitions(self, vector, unique_database): + """Test that RECOVER PARTITIONS correctly discovers new partitions added externally + by the hdfs client, recovered in batches""" + + TBL_NAME = "test_recover_partitions" + FQ_TBL_NAME = unique_database + "." + TBL_NAME + TBL_LOCATION = self.__get_fs_location(unique_database, TBL_NAME) + + self.execute_query_expect_success(self.client, + "CREATE TABLE %s (c int) PARTITIONED BY (s string)" % (FQ_TBL_NAME)) + + # Create 700 partitions externally + for i in xrange(1, 700): + PART_DIR = "s=part%d/" % i + FILE_PATH = "test" + self.filesystem_client.make_dir(TBL_LOCATION + PART_DIR) + + result = self.execute_query_expect_success(self.client, + "SHOW PARTITIONS %s" % FQ_TBL_NAME) + for i in xrange(1, 700): + PART_DIR = "part%d\t" % i + assert not self.has_value(PART_DIR, result.data) + self.execute_query_expect_success(self.client, + "ALTER TABLE %s RECOVER PARTITIONS" % FQ_TBL_NAME) + result = self.execute_query_expect_success(self.client, + "SHOW PARTITIONS %s" % FQ_TBL_NAME) + for i in xrange(1, 700): + PART_DIR = "part%d\t" % i + assert self.has_value(PART_DIR, result.data) + + @SkipIfLocal.hdfs_client def test_duplicate_partitions(self, vector, unique_database): """Test that RECOVER PARTITIONS does not recover equivalent partitions. Two partitions are considered equivalent if they correspond to distinct paths but can be converted
