IMPALA-5167: Reduce the number of Kudu clients created (FE)

Creating Kudu clients is very expensive as each will fetch
metadata from the Kudu master, so we should minimize the
number of Kudu clients that get created.

This patch stores a map from Kudu master addressed to Kudu
clients in KuduUtil to be used across the FE and catalog.
Another patch has already addressed the BE.

Future work will consider providing a way to invalidate
the stored Kudu clients in case something goes wrong
(IMPALA-5685)

This relies on two changes on the Kudu side: one that clears
non-covered range entries from the client's cache on table
open (d07ecd6ded01201c912d2e336611a6a941f48d98), and one
that automatically refreshes auth tokens when they expire
(603c1578c78c0377ffafdd9c427ebfd8a206bda3).

This patch disables some tests that no longer work as
they relied on Kudu metadata loading operations timing out,
but since we're reusing clients the metadata is already
loaded when the test is run.

Testing:
- Ran a stress test on a 10 node cluster: scan of a small
  Kudu table, 1000 concurrent queries, load on the Kudu
  master was reduced signficantly, from ~50% cpu to ~5%.
  (with the BE changes included)
- Ran the Kudu e2e tests.
- Manually ran a test with concurrent INSERTs and
  'ALTER TABLE ADD PARTITION' (which is affected by the
  Kudu side change mentiond above) and verified
  correctness.

Change-Id: I9b0b346f37ee43f7f0eefe34a093eddbbdcf2a5e
Reviewed-on: http://gerrit.cloudera.org:8080/6898
Reviewed-by: Thomas Tauber-Marshall <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/399b184b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/399b184b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/399b184b

Branch: refs/heads/master
Commit: 399b184bbcf5a1fb06b5afbebf9062e69d02beed
Parents: 322ccb0
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Tue May 16 09:37:03 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Fri Jul 21 21:49:04 2017 +0000

----------------------------------------------------------------------
 .../org/apache/impala/catalog/KuduTable.java    |  9 ++++--
 .../org/apache/impala/planner/KuduScanNode.java |  3 +-
 .../impala/service/KuduCatalogOpExecutor.java   | 18 +++++++----
 .../java/org/apache/impala/util/KuduUtil.java   | 32 +++++++++++++-------
 .../QueryTest/kudu-timeouts-catalogd.test       | 12 +++++---
 .../QueryTest/kudu-timeouts-impalad.test        | 12 +++++---
 6 files changed, 55 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java 
b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index a79ffe0..cb94503 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -204,7 +204,8 @@ public class KuduTable extends Table {
     setTableStats(msTable_);
 
     // Connect to Kudu to retrieve table metadata
-    try (KuduClient kuduClient = 
KuduUtil.createKuduClient(getKuduMasterHosts())) {
+    KuduClient kuduClient = KuduUtil.getKuduClient(getKuduMasterHosts());
+    try {
       kuduTable = kuduClient.openTable(kuduTableName_);
     } catch (KuduException e) {
       throw new TableLoadingException(String.format(
@@ -389,7 +390,8 @@ public class KuduTable extends Table {
     resultSchema.addToColumns(new TColumn("Leader Replica", 
Type.STRING.toThrift()));
     resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift()));
 
-    try (KuduClient client = KuduUtil.createKuduClient(getKuduMasterHosts())) {
+    KuduClient client = KuduUtil.getKuduClient(getKuduMasterHosts());
+    try {
       org.apache.kudu.client.KuduTable kuduTable = 
client.openTable(kuduTableName_);
       List<LocatedTablet> tablets =
           
kuduTable.getTabletsLocations(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
@@ -432,7 +434,8 @@ public class KuduTable extends Table {
     // Build column header
     String header = "RANGE (" + 
Joiner.on(',').join(getRangePartitioningColNames()) + ")";
     resultSchema.addToColumns(new TColumn(header, Type.STRING.toThrift()));
-    try (KuduClient client = KuduUtil.createKuduClient(getKuduMasterHosts())) {
+    KuduClient client = KuduUtil.getKuduClient(getKuduMasterHosts());
+    try {
       org.apache.kudu.client.KuduTable kuduTable = 
client.openTable(kuduTableName_);
       // The Kudu table API will return the partitions in sorted order by 
value.
       List<String> partitions = kuduTable.getFormattedRangePartitions(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 4687129..57403e4 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -111,7 +111,8 @@ public class KuduScanNode extends ScanNode {
   public void init(Analyzer analyzer) throws ImpalaRuntimeException {
     conjuncts_ = orderConjunctsByCost(conjuncts_);
 
-    try (KuduClient client = 
KuduUtil.createKuduClient(kuduTable_.getKuduMasterHosts())) {
+    KuduClient client = 
KuduUtil.getKuduClient(kuduTable_.getKuduMasterHosts());
+    try {
       org.apache.kudu.client.KuduTable rpcTable =
           client.openTable(kuduTable_.getKuduTableName());
       validateSchema(rpcTable);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 9984239..cbbfccf 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -76,7 +76,8 @@ public class KuduCatalogOpExecutor {
       LOG.trace(String.format("Creating table '%s' in master '%s'", 
kuduTableName,
           masterHosts));
     }
-    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
+    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    try {
       // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure 
atomicity.
       // (see KUDU-1710).
       if (kudu.tableExists(kuduTableName)) {
@@ -213,7 +214,8 @@ public class KuduCatalogOpExecutor {
       LOG.trace(String.format("Dropping table '%s' from master '%s'", 
tableName,
           masterHosts));
     }
-    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
+    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    try {
       Preconditions.checkState(!Strings.isNullOrEmpty(tableName));
       // TODO: The IF EXISTS case should be handled by Kudu to ensure 
atomicity.
       // (see KUDU-1710).
@@ -244,7 +246,8 @@ public class KuduCatalogOpExecutor {
       LOG.trace(String.format("Loading schema of table '%s' from master '%s'",
           kuduTableName, masterHosts));
     }
-    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
+    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    try {
       if (!kudu.tableExists(kuduTableName)) {
         throw new ImpalaRuntimeException(String.format("Table does not exist 
in Kudu: " +
             "'%s'", kuduTableName));
@@ -286,7 +289,8 @@ public class KuduCatalogOpExecutor {
     Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts));
     String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME);
     Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
-    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
+    KuduClient kudu = KuduUtil.getKuduClient(masterHosts);
+    try {
       kudu.tableExists(kuduTableName);
     } catch (Exception e) {
       // TODO: This is misleading when there are other errors, e.g. timeouts.
@@ -305,7 +309,8 @@ public class KuduCatalogOpExecutor {
     alterTableOptions.renameTable(newName);
     String errMsg = String.format("Error renaming Kudu table " +
         "%s to %s", tbl.getKuduTableName(), newName);
-    try (KuduClient client = 
KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) {
+    KuduClient client = KuduUtil.getKuduClient(tbl.getKuduMasterHosts());
+    try {
       client.alterTable(tbl.getKuduTableName(), alterTableOptions);
       if (!client.isAlterTableDone(newName)) {
         throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed 
out");
@@ -475,7 +480,8 @@ public class KuduCatalogOpExecutor {
    */
   public static void alterKuduTable(KuduTable tbl, AlterTableOptions ato, 
String errMsg)
       throws ImpalaRuntimeException {
-    try (KuduClient client = 
KuduUtil.createKuduClient(tbl.getKuduMasterHosts())) {
+    KuduClient client = KuduUtil.getKuduClient(tbl.getKuduMasterHosts());
+    try {
       client.alterTable(tbl.getKuduTableName(), ato);
       if (!client.isAlterTableDone(tbl.getKuduTableName())) {
         throw new ImpalaRuntimeException(errMsg + ": Kudu operation timed 
out");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java 
b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index be98cf6..4df8005 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -20,6 +20,7 @@ package org.apache.impala.util;
 import static java.lang.String.format;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.impala.analysis.Analyzer;
 import org.apache.impala.analysis.DescriptorTable;
@@ -55,6 +56,7 @@ import org.apache.kudu.client.RangePartitionBound;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class KuduUtil {
 
@@ -66,19 +68,27 @@ public class KuduUtil {
   // be sufficient for the Frontend/Catalog use, and has been tested in stress 
tests.
   private static int KUDU_CLIENT_WORKER_THREAD_COUNT = 5;
 
+  // Maps lists of master addresses to KuduClients, for sharing clients across 
the FE.
+  private static Map<String, KuduClient> kuduClients_ = Maps.newHashMap();
+
   /**
-   * Creates a KuduClient with the specified Kudu master addresses (as a 
comma-separated
-   * list of host:port pairs). The 'admin operation timeout' and the 
'operation timeout'
-   * are set to BackendConfig.getKuduClientTimeoutMs(). The 'admin operations 
timeout' is
-   * used for operations like creating/deleting tables. The 'operation 
timeout' is used
-   * when fetching tablet metadata.
+   * Gets a KuduClient for the specified Kudu master addresses (as a 
comma-separated
+   * list of host:port pairs). It will look up and share an existing 
KuduClient, if
+   * possible, or it will create a new one to return.
+   * The 'admin operation timeout' and the 'operation timeout' are set to
+   * BackendConfig.getKuduClientTimeoutMs(). The 'admin operations timeout' is 
used for
+   * operations like creating/deleting tables. The 'operation timeout' is used 
when
+   * fetching tablet metadata.
    */
-  public static KuduClient createKuduClient(String kuduMasters) {
-    KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters);
-    
b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
-    
b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
-    b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT);
-    return b.build();
+  public static KuduClient getKuduClient(String kuduMasters) {
+    if (!kuduClients_.containsKey(kuduMasters)) {
+      KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters);
+      
b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
+      
b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
+      b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT);
+      kuduClients_.put(kuduMasters, b.build());
+    }
+    return kuduClients_.get(kuduMasters);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
 
b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
index d811cfd..63af18d 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
@@ -1,3 +1,10 @@
+# TODO: enable this once we have a way to invalidate kudu clients (IMPALA-5685)
+#====
+#---- QUERY
+#show create table functional_kudu.alltypestiny
+#---- CATCH
+#Error opening Kudu table 'impala::functional_kudu.alltypestiny'
+#====
 ====
 ---- QUERY
 # TODO: improve error messages (here and below) when KUDU-1734 is resolved
@@ -6,11 +13,6 @@ describe functional_kudu.alltypes
 Error opening Kudu table 'impala::functional_kudu.alltypes'
 ====
 ---- QUERY
-show create table functional_kudu.alltypes
----- CATCH
-Error opening Kudu table 'impala::functional_kudu.alltypes'
-====
----- QUERY
 create table test_kudu (x int primary key)
 partition by hash(x) partitions 3 stored as kudu
 ---- CATCH

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/399b184b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
 
b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
index ba3341e..cde4df5 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
@@ -1,3 +1,10 @@
+# TODO: enable this once we have a way to invalidate kudu clients (IMPALA-5685)
+#====
+#---- QUERY
+#show table stats functional_kudu.alltypestiny
+#---- CATCH
+#Error accessing Kudu for table stats
+#====
 ====
 ---- QUERY
 # Expected timeout while planning the scan node.
@@ -6,8 +13,3 @@ select * from functional_kudu.alltypes
 ---- CATCH
 Unable to initialize the Kudu scan node
 ====
----- QUERY
-show table stats functional_kudu.alltypes
----- CATCH
-Error accessing Kudu for table stats
-====

Reply via email to