IMPALA-4524: Batch ALTER TABLE...ADD PARTITION calls. This commit allows users to add more than 500 (=MAX_PARTITION_UPDATES_PER_RPC) partitions in a single ALTER TABLE command. We batch the operations against Hive into groups of 500.
I tested this manually, creating 1002 partitions and observing the expected 3 API calls against the Hive Metastore in the log. I can confirm that there is coverage of this in some existing tests. A new, simple, test has been added that confirms that creating 502 partitions works. Change-Id: I95f8221ff08c0f126f951f7d37ff5e57985f855f Reviewed-on: http://gerrit.cloudera.org:8080/8238 Reviewed-by: Alex Behm <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/308bd5d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/308bd5d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/308bd5d5 Branch: refs/heads/master Commit: 308bd5d58812470c6b760f889547ed90832eacb3 Parents: 6ae08ef Author: Philip Zeyliger <[email protected]> Authored: Mon Oct 9 11:49:13 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue Oct 24 07:40:16 2017 +0000 ---------------------------------------------------------------------- .../analysis/AlterTableAddPartitionStmt.java | 5 --- .../impala/service/CatalogOpExecutor.java | 46 ++++++++++---------- .../apache/impala/analysis/AnalyzeDDLTest.java | 19 -------- tests/metadata/test_ddl.py | 23 +++++++++- 4 files changed, 45 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/308bd5d5/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java index 151f245..59fdf2b 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java @@ -88,11 +88,6 @@ public class AlterTableAddPartitionStmt extends AlterTableStmt { throw new AnalysisException("ALTER TABLE ADD PARTITION is not supported for " + "Kudu tables: " + table.getTableName()); } - if (partitions_.size() > CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC) { - throw new AnalysisException( - String.format("One ALTER TABLE ADD PARTITION cannot add more than %d " + - "partitions.", CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC)); - } Set<String> partitionSpecs = Sets.newHashSet(); for (PartitionDef p: partitions_) { p.analyze(analyzer); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/308bd5d5/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index da2c931..0ae9a25 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -1925,7 +1925,7 @@ public class CatalogOpExecutor { TableName tableName = tbl.getTableName(); org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable().deepCopy(); boolean ifNotExists = addPartParams.isIf_not_exists(); - List<Partition> hmsPartitionsToAdd = Lists.newArrayList(); + List<Partition> allHmsPartitionsToAdd = Lists.newArrayList(); Map<List<String>, THdfsCachingOp> partitionCachingOpMap = Maps.newHashMap(); for (TPartitionDef partParams: addPartParams.getPartitions()) { List<TPartitionKeyValue> partitionSpec = partParams.getPartition_spec(); @@ -1943,23 +1943,26 @@ public class CatalogOpExecutor { Partition hmsPartition = createHmsPartition(partitionSpec, msTbl, tableName, partParams.getLocation()); - hmsPartitionsToAdd.add(hmsPartition); + allHmsPartitionsToAdd.add(hmsPartition); THdfsCachingOp cacheOp = partParams.getCache_op(); if (cacheOp != null) partitionCachingOpMap.put(hmsPartition.getValues(), cacheOp); } - if (hmsPartitionsToAdd.isEmpty()) return null; + if (allHmsPartitionsToAdd.isEmpty()) return null; try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { - // Add partitions in bulk - List<Partition> addedHmsPartitions = null; - try { - addedHmsPartitions = msClient.getHiveClient().add_partitions(hmsPartitionsToAdd, - ifNotExists, true); - } catch (TException e) { - throw new ImpalaRuntimeException( - String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partitions"), e); + List<Partition> addedHmsPartitions = Lists.newArrayList(); + + for (List<Partition> hmsSublist : + Lists.partition(allHmsPartitionsToAdd, MAX_PARTITION_UPDATES_PER_RPC)) { + try { + addedHmsPartitions.addAll(msClient.getHiveClient().add_partitions(hmsSublist, + ifNotExists, true)); + } catch (TException e) { + throw new ImpalaRuntimeException( + String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partitions"), e); + } } // Handle HDFS cache. This is done in a separate round bacause we have to apply @@ -1970,8 +1973,8 @@ public class CatalogOpExecutor { // If 'ifNotExists' is true, add_partitions() may fail to add all the partitions to // HMS because some of them may already exist there. In that case, we load in the // catalog the partitions that already exist in HMS but aren't in the catalog yet. - if (hmsPartitionsToAdd.size() != addedHmsPartitions.size()) { - List<Partition> difference = computeDifference(hmsPartitionsToAdd, + if (allHmsPartitionsToAdd.size() != addedHmsPartitions.size()) { + List<Partition> difference = computeDifference(allHmsPartitionsToAdd, addedHmsPartitions); addedHmsPartitions.addAll( getPartitionsFromHms(msTbl, msClient, tableName, difference)); @@ -1982,8 +1985,8 @@ public class CatalogOpExecutor { // updated catalog version. addHdfsPartition(tbl, partition); } - return tbl; } + return tbl; } /** @@ -2606,10 +2609,8 @@ public class CatalogOpExecutor { // Add partitions to metastore. try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'. - for (int i = 0; i < hmsPartitions.size(); i += MAX_PARTITION_UPDATES_PER_RPC) { - int endPartitionIndex = - Math.min(i + MAX_PARTITION_UPDATES_PER_RPC, hmsPartitions.size()); - List<Partition> hmsSublist = hmsPartitions.subList(i, endPartitionIndex); + for (List<Partition> hmsSublist : + Lists.partition(hmsPartitions, MAX_PARTITION_UPDATES_PER_RPC)) { // ifNotExists and needResults are true. List<Partition> hmsAddedPartitions = msClient.getHiveClient().add_partitions(hmsSublist, true, true); @@ -2935,16 +2936,15 @@ public class CatalogOpExecutor { try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'. - for (int i = 0; i < hmsPartitions.size(); i += MAX_PARTITION_UPDATES_PER_RPC) { - int endPartitionIndex = - Math.min(i + MAX_PARTITION_UPDATES_PER_RPC, hmsPartitions.size()); + for (List<Partition> hmsPartitionsSubList : + Lists.partition(hmsPartitions, MAX_PARTITION_UPDATES_PER_RPC)) { try { // Alter partitions in bulk. MetastoreShim.alterPartitions(msClient.getHiveClient(), dbName, tableName, - hmsPartitions.subList(i, endPartitionIndex)); + hmsPartitionsSubList); // Mark the corresponding HdfsPartition objects as dirty for (org.apache.hadoop.hive.metastore.api.Partition msPartition: - hmsPartitions.subList(i, endPartitionIndex)) { + hmsPartitionsSubList) { try { catalog_.getHdfsPartition(dbName, tableName, msPartition).markDirty(); } catch (PartitionNotFoundException e) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/308bd5d5/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java index 1935754..11fea8d 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -44,7 +44,6 @@ import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.FrontendTestBase; import org.apache.impala.common.RuntimeEnv; -import org.apache.impala.service.CatalogOpExecutor; import org.apache.impala.testutil.TestUtils; import org.apache.impala.thrift.TDescribeTableParams; import org.apache.impala.util.MetaStoreUtil; @@ -306,24 +305,6 @@ public class AnalyzeDDLTest extends FrontendTestBase { "The specified cache pool does not exist: nonExistentTestPool"); } - // Test the limit for the number of partitions - StringBuilder stmt = new StringBuilder("alter table functional.alltypes add"); - int year; - int month; - for (int i = 0; i < CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC; ++i) { - year = i/12 + 2050; - month = i%12 + 1; - stmt.append(String.format(" partition(year=%d, month=%d)", year, month)); - } - AnalyzesOk(stmt.toString()); - // Over the limit by one partition - year = CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC/12 + 2050; - month = CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC%12 + 1; - stmt.append(String.format(" partition(year=%d, month=%d)", year, month)); - AnalysisError(stmt.toString(), - String.format("One ALTER TABLE ADD PARTITION cannot add more than %d partitions.", - CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC)); - // If 'IF NOT EXISTS' is not used, ALTER TABLE ADD PARTITION cannot add a preexisting // partition to a table. AnalysisError("alter table functional.alltypes add partition(year=2050, month=1)" + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/308bd5d5/tests/metadata/test_ddl.py ---------------------------------------------------------------------- diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py index feccdc5..d8c6000 100644 --- a/tests/metadata/test_ddl.py +++ b/tests/metadata/test_ddl.py @@ -17,6 +17,7 @@ import getpass import pytest +import re import time from test_ddl_base import TestDdlBase @@ -24,7 +25,7 @@ from tests.common.impala_test_suite import LOG from tests.common.parametrize import UniqueDatabase from tests.common.skip import SkipIf, SkipIfADLS, SkipIfLocal from tests.common.test_dimensions import create_single_exec_option_dimension -from tests.util.filesystem_utils import WAREHOUSE, IS_HDFS, IS_LOCAL, IS_S3, IS_ADLS +from tests.util.filesystem_utils import WAREHOUSE, IS_HDFS, IS_S3, IS_ADLS # Validates DDL statements (create, drop) class TestDdlStatements(TestDdlBase): @@ -366,6 +367,26 @@ class TestDdlStatements(TestDdlBase): "insert into table {0} partition(j=1, s='1') select 1".format(fq_tbl_name)) assert '1' == self.execute_scalar("select count(*) from {0}".format(fq_tbl_name)) + def test_alter_table_create_many_partitions(self, vector, unique_database): + """ + Checks that creating more partitions than the MAX_PARTITION_UPDATES_PER_RPC + batch size works, in that it creates all the underlying partitions. + """ + self.client.execute( + "create table {0}.t(i int) partitioned by (p int)".format(unique_database)) + MAX_PARTITION_UPDATES_PER_RPC = 500 + alter_stmt = "alter table {0}.t add ".format(unique_database) + " ".join( + "partition(p=%d)" % (i,) for i in xrange(MAX_PARTITION_UPDATES_PER_RPC + 2)) + self.client.execute(alter_stmt) + partitions = self.client.execute("show partitions {0}.t".format(unique_database)) + # Show partitions will contain partition HDFS paths, which we expect to contain + # "p=val" subdirectories for each partition. The regexp finds all the "p=[0-9]*" + # paths, converts them to integers, and checks that wehave all the ones we + # expect. + PARTITION_RE = re.compile("p=([0-9]+)") + assert map(int, PARTITION_RE.findall(str(partitions))) == \ + range(MAX_PARTITION_UPDATES_PER_RPC + 2) + def test_create_alter_tbl_properties(self, vector, unique_database): fq_tbl_name = unique_database + ".test_alter_tbl"
