IMPALA-4584: Make alter table operations on Kudu tables synchronous

This commit changes the behavior of alter table operations on Kudu
tables from asynchronous to synchronous. With this change, alter table
operations return when either the operations complete successfully or
a timeout is reached.

Change-Id: I385bce66691ae9040e72f97557e1bba31009e36b
Reviewed-on: http://gerrit.cloudera.org:8080/5364
Reviewed-by: Dimitris Tsirogiannis <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5bb9959f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5bb9959f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5bb9959f

Branch: refs/heads/master
Commit: 5bb9959fa4149f3b4976ab73c99b802b42e4fafc
Parents: 9f387c8
Author: Dimitris Tsirogiannis <[email protected]>
Authored: Mon Dec 5 13:14:50 2016 -0800
Committer: Internal Jenkins <[email protected]>
Committed: Tue Dec 6 03:53:15 2016 +0000

----------------------------------------------------------------------
 .../impala/service/KuduCatalogOpExecutor.java   | 61 +++++++++++---------
 .../queries/QueryTest/kudu_alter.test           |  3 +-
 2 files changed, 36 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb9959f/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 8c978b4..0f8f8fd 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -288,11 +288,15 @@ public class KuduCatalogOpExecutor {
     Preconditions.checkState(!Strings.isNullOrEmpty(newName));
     AlterTableOptions alterTableOptions = new AlterTableOptions();
     alterTableOptions.renameTable(newName);
+    String errMsg = String.format("Error renaming Kudu table " +
+        "%s to %s", tbl.getKuduTableName(), newName);
     try (KuduClient client = 
KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) {
       client.alterTable(tbl.getKuduTableName(), alterTableOptions);
+      if (!client.isAlterTableDone(newName)) {
+        throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed 
out");
+      }
     } catch (KuduException e) {
-      throw new ImpalaRuntimeException(String.format("Error renaming Kudu 
table " +
-      "%s to %s", tbl.getName(), newName), e);
+      throw new ImpalaRuntimeException(errMsg, e);
     }
   }
 
@@ -316,15 +320,13 @@ public class KuduCatalogOpExecutor {
       alterTableOptions.dropRangePartition(lowerBound.first, upperBound.first,
           lowerBound.second, upperBound.second);
     }
-    try (KuduClient client = 
KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) {
-      client.alterTable(tbl.getKuduTableName(), alterTableOptions);
-    } catch (KuduException e) {
-      if (!params.isIgnore_errors()) {
-        throw new ImpalaRuntimeException(String.format("Error %s range 
partition in " +
-            "table %s",
-            (type == TRangePartitionOperationType.ADD ? "adding" : "dropping"),
-            tbl.getName()), e);
-      }
+    String errMsg = String.format("Error %s range partition in " +
+        "table %s", (type == TRangePartitionOperationType.ADD ? "adding" : 
"dropping"),
+        tbl.getName());
+    try {
+      alterKuduTable(tbl, alterTableOptions, errMsg);
+    } catch (ImpalaRuntimeException e) {
+      if (!params.isIgnore_errors()) throw e;
     }
   }
 
@@ -394,12 +396,8 @@ public class KuduCatalogOpExecutor {
         }
       }
     }
-    try (KuduClient client = 
KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) {
-      client.alterTable(tbl.getKuduTableName(), alterTableOptions);
-    } catch (KuduException e) {
-      throw new ImpalaRuntimeException("Error adding columns to Kudu table " +
-          tbl.getKuduTableName(), e);
-    }
+    String errMsg = "Error adding columns to Kudu table " + tbl.getName();
+    alterKuduTable(tbl, alterTableOptions, errMsg);
   }
 
   /**
@@ -410,12 +408,9 @@ public class KuduCatalogOpExecutor {
     Preconditions.checkState(!Strings.isNullOrEmpty(colName));
     AlterTableOptions alterTableOptions = new AlterTableOptions();
     alterTableOptions.dropColumn(colName);
-    try (KuduClient client = 
KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) {
-      client.alterTable(tbl.getKuduTableName(), alterTableOptions);
-    } catch (KuduException e) {
-      throw new ImpalaRuntimeException(String.format("Error dropping column %s 
from " +
-          "Kudu table %s", colName, tbl.getName()), e);
-    }
+    String errMsg = String.format("Error dropping column %s from " +
+        "Kudu table %s", colName, tbl.getName());
+    alterKuduTable(tbl, alterTableOptions, errMsg);
   }
 
   /**
@@ -427,11 +422,25 @@ public class KuduCatalogOpExecutor {
     Preconditions.checkNotNull(newCol);
     AlterTableOptions alterTableOptions = new AlterTableOptions();
     alterTableOptions.renameColumn(oldName, newCol.getColumnName());
+    String errMsg = String.format("Error renaming column %s to %s " +
+        "for Kudu table %s", oldName, newCol.getColumnName(), tbl.getName());
+    alterKuduTable(tbl, alterTableOptions, errMsg);
+  }
+
+  /**
+   * Alters a Kudu table based on the specified AlterTableOptions params. 
Blocks until
+   * the alter table operation is finished or until the operation timeout is 
reached.
+   * Throws an ImpalaRuntimeException if the operation cannot be completed 
successfully.
+   */
+  public static void alterKuduTable(KuduTable tbl, AlterTableOptions ato, 
String errMsg)
+      throws ImpalaRuntimeException {
     try (KuduClient client = 
KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) {
-      client.alterTable(tbl.getKuduTableName(), alterTableOptions);
+      client.alterTable(tbl.getKuduTableName(), ato);
+      if (!client.isAlterTableDone(tbl.getKuduTableName())) {
+        throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed 
out");
+      }
     } catch (KuduException e) {
-      throw new ImpalaRuntimeException(String.format("Error renaming column %s 
to %s " +
-          "for Kudu table %s", oldName, newCol.getColumnName(), 
tbl.getName()), e);
+      throw new ImpalaRuntimeException(errMsg, e);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5bb9959f/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test 
b/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test
index 505e9fe..116a7e9 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test
@@ -50,7 +50,6 @@ BIGINT
 # Create a table with range distribution
 create table tbl_to_alter (id int primary key, name string null, vali bigint 
not null)
   distribute by range (id) (partition 1 < values <= 10) stored as kudu
-  tblproperties('kudu.table_name'='tbl_to_alter')
 ---- RESULTS
 ====
 ---- QUERY
@@ -296,7 +295,7 @@ alter table tbl_to_alter set 
tblproperties('kudu.table_name'='kudu_tbl_to_alter'
 ---- QUERY
 # Create a new table and try to rename to an existing kudu table
 create table copy_of_tbl (a int primary key) distribute by hash (a) into 3 
buckets
-  stored as kudu;
+  stored as kudu tblproperties('kudu.table_name'='copy_of_tbl');
 alter table copy_of_tbl set 
tblproperties('kudu.table_name'='kudu_tbl_to_alter')
 ---- CATCH
 ImpalaRuntimeException: Error renaming Kudu table copy_of_tbl to 
kudu_tbl_to_alter

Reply via email to