IMPALA-4524: Batch ALTER TABLE...ADD PARTITION calls.

This commit allows users to add more than 500
(=MAX_PARTITION_UPDATES_PER_RPC) partitions in a single ALTER TABLE
command. We batch the operations against Hive into groups of 500.

I tested this manually, creating 1002 partitions and observing the
expected 3 API calls against the Hive Metastore in the log.  I can
confirm that there is coverage of this in some existing tests.  A new,
simple, test has been added that confirms that creating 502 partitions
works.

Change-Id: I95f8221ff08c0f126f951f7d37ff5e57985f855f
Reviewed-on: http://gerrit.cloudera.org:8080/8238
Reviewed-by: Alex Behm <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/308bd5d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/308bd5d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/308bd5d5

Branch: refs/heads/master
Commit: 308bd5d58812470c6b760f889547ed90832eacb3
Parents: 6ae08ef
Author: Philip Zeyliger <[email protected]>
Authored: Mon Oct 9 11:49:13 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Oct 24 07:40:16 2017 +0000

----------------------------------------------------------------------
 .../analysis/AlterTableAddPartitionStmt.java    |  5 ---
 .../impala/service/CatalogOpExecutor.java       | 46 ++++++++++----------
 .../apache/impala/analysis/AnalyzeDDLTest.java  | 19 --------
 tests/metadata/test_ddl.py                      | 23 +++++++++-
 4 files changed, 45 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/308bd5d5/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
index 151f245..59fdf2b 100644
--- 
a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
+++ 
b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddPartitionStmt.java
@@ -88,11 +88,6 @@ public class AlterTableAddPartitionStmt extends 
AlterTableStmt {
       throw new AnalysisException("ALTER TABLE ADD PARTITION is not supported 
for " +
           "Kudu tables: " + table.getTableName());
     }
-    if (partitions_.size() > CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC) {
-      throw new AnalysisException(
-          String.format("One ALTER TABLE ADD PARTITION cannot add more than %d 
" +
-          "partitions.", CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC));
-    }
     Set<String> partitionSpecs = Sets.newHashSet();
     for (PartitionDef p: partitions_) {
       p.analyze(analyzer);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/308bd5d5/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index da2c931..0ae9a25 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -1925,7 +1925,7 @@ public class CatalogOpExecutor {
     TableName tableName = tbl.getTableName();
     org.apache.hadoop.hive.metastore.api.Table msTbl = 
tbl.getMetaStoreTable().deepCopy();
     boolean ifNotExists = addPartParams.isIf_not_exists();
-    List<Partition> hmsPartitionsToAdd = Lists.newArrayList();
+    List<Partition> allHmsPartitionsToAdd = Lists.newArrayList();
     Map<List<String>, THdfsCachingOp> partitionCachingOpMap = 
Maps.newHashMap();
     for (TPartitionDef partParams: addPartParams.getPartitions()) {
       List<TPartitionKeyValue> partitionSpec = partParams.getPartition_spec();
@@ -1943,23 +1943,26 @@ public class CatalogOpExecutor {
 
       Partition hmsPartition = createHmsPartition(partitionSpec, msTbl, 
tableName,
           partParams.getLocation());
-      hmsPartitionsToAdd.add(hmsPartition);
+      allHmsPartitionsToAdd.add(hmsPartition);
 
       THdfsCachingOp cacheOp = partParams.getCache_op();
       if (cacheOp != null) partitionCachingOpMap.put(hmsPartition.getValues(), 
cacheOp);
     }
 
-    if (hmsPartitionsToAdd.isEmpty()) return null;
+    if (allHmsPartitionsToAdd.isEmpty()) return null;
 
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
-      // Add partitions in bulk
-      List<Partition> addedHmsPartitions = null;
-      try {
-        addedHmsPartitions = 
msClient.getHiveClient().add_partitions(hmsPartitionsToAdd,
-            ifNotExists, true);
-      } catch (TException e) {
-        throw new ImpalaRuntimeException(
-            String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partitions"), e);
+      List<Partition> addedHmsPartitions = Lists.newArrayList();
+
+      for (List<Partition> hmsSublist :
+          Lists.partition(allHmsPartitionsToAdd, 
MAX_PARTITION_UPDATES_PER_RPC)) {
+        try {
+          
addedHmsPartitions.addAll(msClient.getHiveClient().add_partitions(hmsSublist,
+              ifNotExists, true));
+        } catch (TException e) {
+          throw new ImpalaRuntimeException(
+              String.format(HMS_RPC_ERROR_FORMAT_STR, "add_partitions"), e);
+        }
       }
 
       // Handle HDFS cache. This is done in a separate round bacause we have 
to apply
@@ -1970,8 +1973,8 @@ public class CatalogOpExecutor {
       // If 'ifNotExists' is true, add_partitions() may fail to add all the 
partitions to
       // HMS because some of them may already exist there. In that case, we 
load in the
       // catalog the partitions that already exist in HMS but aren't in the 
catalog yet.
-      if (hmsPartitionsToAdd.size() != addedHmsPartitions.size()) {
-        List<Partition> difference = computeDifference(hmsPartitionsToAdd,
+      if (allHmsPartitionsToAdd.size() != addedHmsPartitions.size()) {
+        List<Partition> difference = computeDifference(allHmsPartitionsToAdd,
             addedHmsPartitions);
         addedHmsPartitions.addAll(
             getPartitionsFromHms(msTbl, msClient, tableName, difference));
@@ -1982,8 +1985,8 @@ public class CatalogOpExecutor {
         // updated catalog version.
         addHdfsPartition(tbl, partition);
       }
-      return tbl;
     }
+    return tbl;
   }
 
   /**
@@ -2606,10 +2609,8 @@ public class CatalogOpExecutor {
     // Add partitions to metastore.
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'.
-      for (int i = 0; i < hmsPartitions.size(); i += 
MAX_PARTITION_UPDATES_PER_RPC) {
-        int endPartitionIndex =
-            Math.min(i + MAX_PARTITION_UPDATES_PER_RPC, hmsPartitions.size());
-        List<Partition> hmsSublist = hmsPartitions.subList(i, 
endPartitionIndex);
+      for (List<Partition> hmsSublist :
+          Lists.partition(hmsPartitions, MAX_PARTITION_UPDATES_PER_RPC)) {
         // ifNotExists and needResults are true.
         List<Partition> hmsAddedPartitions =
             msClient.getHiveClient().add_partitions(hmsSublist, true, true);
@@ -2935,16 +2936,15 @@ public class CatalogOpExecutor {
 
     try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
       // Apply the updates in batches of 'MAX_PARTITION_UPDATES_PER_RPC'.
-      for (int i = 0; i < hmsPartitions.size(); i += 
MAX_PARTITION_UPDATES_PER_RPC) {
-        int endPartitionIndex =
-            Math.min(i + MAX_PARTITION_UPDATES_PER_RPC, hmsPartitions.size());
+      for (List<Partition> hmsPartitionsSubList :
+        Lists.partition(hmsPartitions, MAX_PARTITION_UPDATES_PER_RPC)) {
         try {
           // Alter partitions in bulk.
           MetastoreShim.alterPartitions(msClient.getHiveClient(), dbName, 
tableName,
-              hmsPartitions.subList(i, endPartitionIndex));
+              hmsPartitionsSubList);
           // Mark the corresponding HdfsPartition objects as dirty
           for (org.apache.hadoop.hive.metastore.api.Partition msPartition:
-              hmsPartitions.subList(i, endPartitionIndex)) {
+              hmsPartitionsSubList) {
             try {
               catalog_.getHdfsPartition(dbName, tableName, 
msPartition).markDirty();
             } catch (PartitionNotFoundException e) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/308bd5d5/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 1935754..11fea8d 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -44,7 +44,6 @@ import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.FrontendTestBase;
 import org.apache.impala.common.RuntimeEnv;
-import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TDescribeTableParams;
 import org.apache.impala.util.MetaStoreUtil;
@@ -306,24 +305,6 @@ public class AnalyzeDDLTest extends FrontendTestBase {
           "The specified cache pool does not exist: nonExistentTestPool");
     }
 
-    // Test the limit for the number of partitions
-    StringBuilder stmt = new StringBuilder("alter table functional.alltypes 
add");
-    int year;
-    int month;
-    for (int i = 0; i < CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC; ++i) {
-      year = i/12 + 2050;
-      month = i%12 + 1;
-      stmt.append(String.format(" partition(year=%d, month=%d)", year, month));
-    }
-    AnalyzesOk(stmt.toString());
-    // Over the limit by one partition
-    year = CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC/12 + 2050;
-    month = CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC%12 + 1;
-    stmt.append(String.format(" partition(year=%d, month=%d)", year, month));
-    AnalysisError(stmt.toString(),
-        String.format("One ALTER TABLE ADD PARTITION cannot add more than %d 
partitions.",
-        CatalogOpExecutor.MAX_PARTITION_UPDATES_PER_RPC));
-
     // If 'IF NOT EXISTS' is not used, ALTER TABLE ADD PARTITION cannot add a 
preexisting
     // partition to a table.
     AnalysisError("alter table functional.alltypes add partition(year=2050, 
month=1)" +

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/308bd5d5/tests/metadata/test_ddl.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index feccdc5..d8c6000 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -17,6 +17,7 @@
 
 import getpass
 import pytest
+import re
 import time
 
 from test_ddl_base import TestDdlBase
@@ -24,7 +25,7 @@ from tests.common.impala_test_suite import LOG
 from tests.common.parametrize import UniqueDatabase
 from tests.common.skip import SkipIf, SkipIfADLS, SkipIfLocal
 from tests.common.test_dimensions import create_single_exec_option_dimension
-from tests.util.filesystem_utils import WAREHOUSE, IS_HDFS, IS_LOCAL, IS_S3, 
IS_ADLS
+from tests.util.filesystem_utils import WAREHOUSE, IS_HDFS, IS_S3, IS_ADLS
 
 # Validates DDL statements (create, drop)
 class TestDdlStatements(TestDdlBase):
@@ -366,6 +367,26 @@ class TestDdlStatements(TestDdlBase):
         "insert into table {0} partition(j=1, s='1') select 
1".format(fq_tbl_name))
     assert '1' == self.execute_scalar("select count(*) from 
{0}".format(fq_tbl_name))
 
+  def test_alter_table_create_many_partitions(self, vector, unique_database):
+    """
+    Checks that creating more partitions than the MAX_PARTITION_UPDATES_PER_RPC
+    batch size works, in that it creates all the underlying partitions.
+    """
+    self.client.execute(
+        "create table {0}.t(i int) partitioned by (p 
int)".format(unique_database))
+    MAX_PARTITION_UPDATES_PER_RPC = 500
+    alter_stmt = "alter table {0}.t add ".format(unique_database) + " ".join(
+        "partition(p=%d)" % (i,) for i in xrange(MAX_PARTITION_UPDATES_PER_RPC 
+ 2))
+    self.client.execute(alter_stmt)
+    partitions = self.client.execute("show partitions 
{0}.t".format(unique_database))
+    # Show partitions will contain partition HDFS paths, which we expect to 
contain
+    # "p=val" subdirectories for each partition. The regexp finds all the 
"p=[0-9]*"
+    # paths, converts them to integers, and checks that wehave all the ones we
+    # expect.
+    PARTITION_RE = re.compile("p=([0-9]+)")
+    assert map(int, PARTITION_RE.findall(str(partitions))) == \
+        range(MAX_PARTITION_UPDATES_PER_RPC + 2)
+
   def test_create_alter_tbl_properties(self, vector, unique_database):
     fq_tbl_name = unique_database + ".test_alter_tbl"
 

Reply via email to