Repository: incubator-impala
Updated Branches:
  refs/heads/hadoop-next eae4f307d -> 32dfcebfa


IMPALA-3771: Expose kudu client timeout and set default

The Kudu client timeout was too low for Impala usage. This
sets the default timeout to 3 minutes and exposes it as a
gflag.

New timeout tests were added.

Change-Id: Iad95e8e38aad4f76d21bac6879db6c02b3c3e045
Reviewed-on: http://gerrit.cloudera.org:8080/4849
Reviewed-by: Matthew Jacobs <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/50f7753d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/50f7753d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/50f7753d

Branch: refs/heads/hadoop-next
Commit: 50f7753d2bf3bde0ea67319b680806a071d69871
Parents: 3e18755
Author: Matthew Jacobs <[email protected]>
Authored: Wed Oct 26 17:25:30 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Sat Nov 5 06:43:45 2016 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog.cc                       |  6 ++-
 be/src/common/global-flags.cc                   |  8 ++++
 be/src/exec/kudu-scan-node.cc                   |  7 +---
 be/src/exec/kudu-scan-node.h                    |  4 +-
 be/src/exec/kudu-scanner.cc                     |  5 +--
 be/src/exec/kudu-table-sink.cc                  | 14 ++-----
 be/src/exec/kudu-table-sink.h                   |  8 ++--
 be/src/exec/kudu-util.cc                        | 11 +++++
 be/src/exec/kudu-util.h                         |  5 +++
 be/src/service/frontend.cc                      |  7 +++-
 .../org/apache/impala/catalog/KuduTable.java    | 43 +++++++++-----------
 .../org/apache/impala/planner/KuduScanNode.java |  5 +--
 .../apache/impala/service/BackendConfig.java    | 18 +++++++-
 .../org/apache/impala/service/JniCatalog.java   | 18 ++++----
 .../org/apache/impala/service/JniFrontend.java  |  4 +-
 .../impala/service/KuduCatalogOpExecutor.java   | 32 ++++++++-------
 .../java/org/apache/impala/util/KuduUtil.java   | 30 ++++++++++----
 .../QueryTest/kudu-timeouts-catalogd.test       | 25 ++++++++++++
 .../QueryTest/kudu-timeouts-impalad.test        | 13 ++++++
 tests/common/impala_test_suite.py               |  3 +-
 tests/custom_cluster/test_kudu.py               | 24 +++++++++++
 21 files changed, 199 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index 9e7af4e..7d6fdca 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -42,10 +42,11 @@ DEFINE_string(sentry_config, "", "Local path to a 
sentry-site.xml configuration
     "file. If set, authorization will be enabled.");
 
 DECLARE_int32(non_impala_java_vlog);
+DECLARE_int32(kudu_operation_timeout_ms);
 
 Catalog::Catalog() {
   JniMethodDescriptor methods[] = {
-    {"<init>", 
"(ZILjava/lang/String;IIZLjava/lang/String;Ljava/lang/String;)V",
+    {"<init>", 
"(ZILjava/lang/String;IIZLjava/lang/String;Ljava/lang/String;I)V",
         &catalog_ctor_},
     {"updateCatalog", "([B)[B", &update_metastore_id_},
     {"execDdl", "([B)[B", &exec_ddl_id_},
@@ -77,10 +78,11 @@ Catalog::Catalog() {
   jboolean auth_to_local = FLAGS_load_auth_to_local_rules && 
!FLAGS_principal.empty();
   jstring principal = jni_env->NewStringUTF(FLAGS_principal.c_str());
   jstring local_library_dir = 
jni_env->NewStringUTF(FLAGS_local_library_dir.c_str());
+  jint kudu_operation_timeout = FLAGS_kudu_operation_timeout_ms;
   jobject catalog = jni_env->NewObject(catalog_class_, catalog_ctor_,
       load_in_background, num_metadata_loading_threads, sentry_config,
       FlagToTLogLevel(FLAGS_v), FlagToTLogLevel(FLAGS_non_impala_java_vlog),
-      auth_to_local, principal, local_library_dir);
+      auth_to_local, principal, local_library_dir, kudu_operation_timeout);
   EXIT_IF_EXC(jni_env);
   ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, catalog, &catalog_));
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index c67ec2c..f75236c 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -108,8 +108,16 @@ DEFINE_int32(fault_injection_rpc_type, 0, "A fault 
injection option that specifi
     "which rpc call will be injected with the delay. Effective in debug builds 
only.");
 #endif
 
+// Used for testing the path where the Kudu client is stubbed.
 DEFINE_bool(disable_kudu, false, "If true, Kudu features will be disabled.");
 
+// Timeout (ms) used in the FE for admin and metadata operations (set on the 
KuduClient),
+// and in the BE for scans and writes (set on the KuduScanner and KuduSession
+// accordingly).
+DEFINE_int32(kudu_operation_timeout_ms, 3 * 60 * 1000, "Timeout (milliseconds) 
set for "
+    "all Kudu operations. This must be a positive value, and there is no way 
to disable "
+    "timeouts.");
+
 DEFINE_bool(enable_accept_queue_server, true,
     "If true, uses a modified version of "
     "TThreadedServer that accepts connections as quickly as possible and hands 
them off "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 1aef102..f98077e 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -121,12 +121,7 @@ Status KuduScanNode::Open(RuntimeState* state) {
   const KuduTableDescriptor* table_desc =
       static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc());
 
-  kudu::client::KuduClientBuilder b;
-  for (const string& address: table_desc->kudu_master_addresses()) {
-    b.add_master_server_addr(address);
-  }
-
-  KUDU_RETURN_IF_ERROR(b.Build(&client_), "Unable to create Kudu client");
+  RETURN_IF_ERROR(CreateKuduClient(table_desc->kudu_master_addresses(), 
&client_));
 
   uint64_t latest_ts = static_cast<uint64_t>(
       max<int64_t>(0, state->query_ctx().session.kudu_latest_observed_ts));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index 79ed480..84e2531 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -65,8 +65,8 @@ class KuduScanNode : public ScanNode {
   const TupleDescriptor* tuple_desc_;
 
   /// The Kudu client and table. Scanners share these instances.
-  std::tr1::shared_ptr<kudu::client::KuduClient> client_;
-  std::tr1::shared_ptr<kudu::client::KuduTable> table_;
+  kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_;
+  kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
 
   /// Set of scan tokens to be deserialized into Kudu scanners.
   std::vector<std::string> scan_tokens_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index d230985..c13b6a8 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -49,8 +49,7 @@ DEFINE_int32(kudu_scanner_keep_alive_period_sec, 15,
     "The period at which Kudu Scanners should send keep-alive requests to the 
tablet "
     "server to ensure that scanners do not time out.");
 
-DEFINE_int32(kudu_scanner_timeout_sec, 60,
-             "The timeout used for Kudu Scan requests.");
+DECLARE_int32(kudu_operation_timeout_ms);
 
 namespace impala {
 
@@ -134,7 +133,7 @@ Status KuduScanner::OpenNextScanToken(const string& 
scan_token)  {
                          "Could not set replica selection.");
   }
 
-  
KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_scanner_timeout_sec 
* 1000),
+  
KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms),
       "Could not set scanner timeout");
 
   {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index fc26a53..14b1889 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -31,11 +31,11 @@
 
 #include "common/names.h"
 
-DEFINE_int32(kudu_session_timeout_seconds, 60, "Timeout set on the Kudu 
session. "
-    "How long to wait before considering a write failed.");
 DEFINE_int32(kudu_mutation_buffer_size, 100 * 1024 * 1024, "The size (bytes) 
of the "
     "Kudu client buffer for mutations.");
 
+DECLARE_int32(kudu_operation_timeout_ms);
+
 using kudu::client::KuduColumnSchema;
 using kudu::client::KuduSchema;
 using kudu::client::KuduClient;
@@ -112,19 +112,13 @@ Status KuduTableSink::Prepare(RuntimeState* state, 
MemTracker* mem_tracker) {
 
 Status KuduTableSink::Open(RuntimeState* state) {
   RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state));
-
-  kudu::client::KuduClientBuilder b;
-  for (const string& address: table_desc_->kudu_master_addresses()) {
-    b.add_master_server_addr(address);
-  }
-
-  KUDU_RETURN_IF_ERROR(b.Build(&client_), "Unable to create Kudu client");
+  RETURN_IF_ERROR(CreateKuduClient(table_desc_->kudu_master_addresses(), 
&client_));
 
   KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(), &table_),
       "Unable to open Kudu table");
 
   session_ = client_->NewSession();
-  session_->SetTimeoutMillis(FLAGS_kudu_session_timeout_seconds * 1000);
+  session_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms);
 
   // KuduSession Set* methods here and below return a status for API 
compatibility.
   // As long as the Kudu client is statically linked, these shouldn't fail and 
thus these

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index fe278c5..c37b15d 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -100,10 +100,10 @@ class KuduTableSink : public DataSink {
   std::vector<ExprContext*> output_expr_ctxs_;
 
   /// The Kudu client, table and session.
-  /// This uses 'std::tr1::shared_ptr' as that is the type expected by Kudu.
-  std::tr1::shared_ptr<kudu::client::KuduClient> client_;
-  std::tr1::shared_ptr<kudu::client::KuduTable> table_;
-  std::tr1::shared_ptr<kudu::client::KuduSession> session_;
+  /// This uses 'kudu::client::sp::shared_ptr' as that is the type expected by 
Kudu.
+  kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_;
+  kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_;
+  kudu::client::sp::shared_ptr<kudu::client::KuduSession> session_;
 
   /// Used to specify the type of write operation (INSERT/UPDATE/DELETE).
   TSinkAction::type sink_action_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-util.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index aedf229..5afa0a4 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -28,9 +28,12 @@
 #include "common/status.h"
 
 using kudu::client::KuduSchema;
+using kudu::client::KuduClient;
+using kudu::client::KuduClientBuilder;
 using kudu::client::KuduColumnSchema;
 
 DECLARE_bool(disable_kudu);
+DECLARE_int32(kudu_operation_timeout_ms);
 
 namespace impala {
 
@@ -54,6 +57,14 @@ Status CheckKuduAvailability() {
   return Status(TErrorCode::KUDU_NOT_SUPPORTED_ON_OS);
 }
 
+Status CreateKuduClient(const vector<string>& master_addrs,
+    kudu::client::sp::shared_ptr<KuduClient>* client) {
+  kudu::client::KuduClientBuilder b;
+  for (const string& address: master_addrs) b.add_master_server_addr(address);
+  KUDU_RETURN_IF_ERROR(b.Build(client), "Unable to create Kudu client");
+  return Status::OK();
+}
+
 string KuduSchemaDebugString(const KuduSchema& schema) {
   stringstream ss;
   for (int i = 0; i < schema.num_columns(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index 7957f7a..7774812 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -37,6 +37,11 @@ Status CheckKuduAvailability();
 /// Convenience function for the bool equivalent of CheckKuduAvailability().
 bool KuduIsAvailable();
 
+/// Creates a new KuduClient using the specified master adresses. If any error 
occurs,
+/// 'client' is not set and an error status is returned.
+Status CreateKuduClient(const std::vector<std::string>& master_addrs,
+    kudu::client::sp::shared_ptr<kudu::client::KuduClient>* client);
+
 /// Returns a debug string for the KuduSchema.
 std::string KuduSchemaDebugString(const kudu::client::KuduSchema& schema);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/service/frontend.cc
----------------------------------------------------------------------
diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc
index ca8ecbb..f8cecfb 100644
--- a/be/src/service/frontend.cc
+++ b/be/src/service/frontend.cc
@@ -33,6 +33,7 @@ DECLARE_string(sentry_config);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_bool(load_auth_to_local_rules);
 DECLARE_string(principal);
+DECLARE_int32(kudu_operation_timeout_ms);
 
 DEFINE_bool(load_catalog_at_startup, false, "if true, load all catalog data at 
startup");
 
@@ -64,7 +65,7 @@ DEFINE_string(kudu_master_hosts, "", "Specifies the default 
Kudu master(s). The
 Frontend::Frontend() {
   JniMethodDescriptor methods[] = {
     {"<init>", "(ZLjava/lang/String;Ljava/lang/String;Ljava/lang/String;"
-        "Ljava/lang/String;IIZLjava/lang/String;)V", &fe_ctor_},
+        "Ljava/lang/String;IIZLjava/lang/String;I)V", &fe_ctor_},
     {"createExecRequest", "([B)[B", &create_exec_request_id_},
     {"getExplainPlan", "([B)Ljava/lang/String;", &get_explain_plan_id_},
     {"getHadoopConfig", "([B)[B", &get_hadoop_config_id_},
@@ -114,9 +115,11 @@ Frontend::Frontend() {
   // and impala is kerberized.
   jboolean auth_to_local = FLAGS_load_auth_to_local_rules && 
!FLAGS_principal.empty();
   jstring kudu_master_hosts = 
jni_env->NewStringUTF(FLAGS_kudu_master_hosts.c_str());
+  jint kudu_operation_timeout = FLAGS_kudu_operation_timeout_ms;
   jobject fe = jni_env->NewObject(fe_class_, fe_ctor_, lazy, server_name,
       policy_file_path, sentry_config, auth_provider_class, 
FlagToTLogLevel(FLAGS_v),
-      FlagToTLogLevel(FLAGS_non_impala_java_vlog), auth_to_local, 
kudu_master_hosts);
+      FlagToTLogLevel(FLAGS_non_impala_java_vlog), auth_to_local, 
kudu_master_hosts,
+      kudu_operation_timeout);
   EXIT_IF_EXC(jni_env);
   ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, fe, &fe_));
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java 
b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index a7a5933..6c30077 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -23,10 +23,15 @@ import java.util.Set;
 
 import javax.xml.bind.DatatypeConverter;
 
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.analysis.DistributeParam;
-import org.apache.impala.analysis.ToSqlUtils;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.service.CatalogOpExecutor;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TDistributeByHashParam;
@@ -40,27 +45,21 @@ import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.TResultRowBuilder;
-import org.apache.impala.service.CatalogOpExecutor;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.LocatedTablet;
+import org.apache.kudu.client.PartitionSchema;
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema;
 import org.apache.kudu.client.PartitionSchema.RangeSchema;
-import org.apache.kudu.client.PartitionSchema;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
 /**
  * Representation of a Kudu table in the catalog cache.
  */
@@ -94,8 +93,6 @@ public class KuduTable extends Table {
   // Key to specify the number of tablet replicas.
   public static final String KEY_TABLET_REPLICAS = "kudu.num_tablet_replicas";
 
-  public static final long KUDU_RPC_TIMEOUT_MS = 50000;
-
   // Table name in the Kudu storage engine. It may not neccessarily be the 
same as the
   // table name specified in the CREATE TABLE statement; the latter
   // is stored in Table.name_. Reasons why KuduTable.kuduTableName_ and 
Table.name_ may
@@ -172,12 +169,12 @@ public class KuduTable extends Table {
     numRows_ = getRowCount(msTable_.getParameters());
 
     // Connect to Kudu to retrieve table metadata
-    try (KuduClient kuduClient = new KuduClient.KuduClientBuilder(
-        getKuduMasterHosts()).build()) {
+    try (KuduClient kuduClient = 
KuduUtil.createKuduClient(getKuduMasterHosts())) {
       kuduTable = kuduClient.openTable(kuduTableName_);
     } catch (KuduException e) {
-      LOG.error("Error accessing Kudu table " + kuduTableName_);
-      throw new TableLoadingException(e.getMessage());
+      throw new TableLoadingException(String.format(
+          "Error opening Kudu table '%s', Kudu error: %s",
+          kuduTableName_, e.getMessage()));
     }
     Preconditions.checkNotNull(kuduTable);
 
@@ -187,7 +184,6 @@ public class KuduTable extends Table {
       loadDistributeByParams(kuduTable);
       loadAllColumnStats(msClient);
     } catch (ImpalaRuntimeException e) {
-      LOG.error("Error loading metadata for Kudu table: " + kuduTableName_);
       throw new TableLoadingException("Error loading metadata for Kudu table " 
+
           kuduTableName_, e);
     }
@@ -341,11 +337,10 @@ public class KuduTable extends Table {
     resultSchema.addToColumns(new TColumn("Leader Replica", 
Type.STRING.toThrift()));
     resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift()));
 
-    try (KuduClient client = new KuduClient.KuduClientBuilder(
-        getKuduMasterHosts()).build()) {
+    try (KuduClient client = KuduUtil.createKuduClient(getKuduMasterHosts())) {
       org.apache.kudu.client.KuduTable kuduTable = 
client.openTable(kuduTableName_);
       List<LocatedTablet> tablets =
-          kuduTable.getTabletsLocations(KUDU_RPC_TIMEOUT_MS);
+          
kuduTable.getTabletsLocations(BackendConfig.getKuduClientTimeoutMs());
       for (LocatedTablet tab: tablets) {
         TResultRowBuilder builder = new TResultRowBuilder();
         builder.add("-1");   // The Kudu client API doesn't expose tablet row 
counts.
@@ -366,7 +361,7 @@ public class KuduTable extends Table {
       }
 
     } catch (Exception e) {
-      throw new ImpalaRuntimeException("Could not communicate with Kudu.", e);
+      throw new ImpalaRuntimeException("Error accessing Kudu for table 
stats.", e);
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 7ef1c20..6f20abd 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -44,10 +44,10 @@ import org.apache.impala.thrift.TPlanNodeType;
 import org.apache.impala.thrift.TScanRange;
 import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
+import org.apache.impala.util.KuduUtil;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduClient;
-import org.apache.kudu.client.KuduClient.KuduClientBuilder;
 import org.apache.kudu.client.KuduPredicate;
 import org.apache.kudu.client.KuduPredicate.ComparisonOp;
 import org.apache.kudu.client.KuduScanToken;
@@ -106,8 +106,7 @@ public class KuduScanNode extends ScanNode {
     analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_);
     conjuncts_ = orderConjunctsByCost(conjuncts_);
 
-    try (KuduClient client =
-         new KuduClientBuilder(kuduTable_.getKuduMasterHosts()).build()) {
+    try (KuduClient client = 
KuduUtil.createKuduClient(kuduTable_.getKuduMasterHosts())) {
       org.apache.kudu.client.KuduTable rpcTable =
           client.openTable(kuduTable_.getKuduTableName());
       validateSchema(rpcTable);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 75dd1f7..c90f674 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.service;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This class is meant to provide the FE with impalad backend configuration 
parameters,
  * including command line arguments.
@@ -31,10 +33,15 @@ public class BackendConfig {
   // the default FLAGS_read_size used by the IO manager in the backend.
   private final long READ_SIZE;
 
-  // This is overriden by JniFrontend/JniCatalog classes with user set 
configuration.
-  // TODO: Read this from backend instead of using static variables.
+  // Variables below are overriden by JniFrontend/JniCatalog with user set 
configuration.
+  // TODO: Read from backend instead of using static variables.
+
+  // Determines how principal to short name conversion works. See User.java 
for more info.
   private static boolean allowAuthToLocalRules_ = false;
 
+  // Kudu client timeout (ms).
+  private static int kuduOperationTimeoutMs_ = 3 * 60 * 1000;
+
   private BackendConfig() {
     // TODO: Populate these by making calls to the backend instead of default 
constants.
     READ_SIZE = 8 * 1024 * 1024L;
@@ -46,4 +53,11 @@ public class BackendConfig {
   public static void setAuthToLocal(boolean authToLocal) {
     allowAuthToLocalRules_ = authToLocal;
   }
+
+  public static int getKuduClientTimeoutMs() { return kuduOperationTimeoutMs_; 
}
+
+  public static void setKuduClientTimeoutMs(int kuduOperationTimeoutMs) {
+    Preconditions.checkArgument(kuduOperationTimeoutMs > 0);
+    BackendConfig.kuduOperationTimeoutMs_ = kuduOperationTimeoutMs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java 
b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index b35877f..7993191 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -21,12 +21,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.thrift.TException;
-import org.apache.thrift.TSerializer;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.authorization.User;
 import org.apache.impala.catalog.CatalogException;
@@ -36,7 +30,6 @@ import org.apache.impala.catalog.Function;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.JniUtil;
-import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDdlExecRequest;
@@ -56,6 +49,12 @@ import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.thrift.TUpdateCatalogRequest;
 import org.apache.impala.util.GlogAppender;
 import org.apache.impala.util.PatternMatcher;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -81,9 +80,10 @@ public class JniCatalog {
 
   public JniCatalog(boolean loadInBackground, int numMetadataLoadingThreads,
       String sentryServiceConfig, int impalaLogLevel, int otherLogLevel,
-      boolean allowAuthToLocal, String kerberosPrincipal, String 
localLibraryPath)
-      throws InternalException {
+      boolean allowAuthToLocal, String kerberosPrincipal, String 
localLibraryPath,
+      int kuduOperationTimeoutMs) throws InternalException {
     BackendConfig.setAuthToLocal(allowAuthToLocal);
+    BackendConfig.setKuduClientTimeoutMs(kuduOperationTimeoutMs);
     Preconditions.checkArgument(numMetadataLoadingThreads > 0);
     // This trick saves having to pass a TLogLevel enum, which is an object 
and more
     // complex to pass through JNI.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/service/JniFrontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java 
b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 0d502e5..3ef89e3 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -117,9 +117,11 @@ public class JniFrontend {
    */
   public JniFrontend(boolean lazy, String serverName, String 
authorizationPolicyFile,
       String sentryConfigFile, String authPolicyProviderClass, int 
impalaLogLevel,
-      int otherLogLevel, boolean allowAuthToLocal, String 
defaultKuduMasterHosts)
+      int otherLogLevel, boolean allowAuthToLocal, String 
defaultKuduMasterHosts,
+      int kuduOperationTimeoutMs)
       throws InternalException {
     BackendConfig.setAuthToLocal(allowAuthToLocal);
+    BackendConfig.setKuduClientTimeoutMs(kuduOperationTimeoutMs);
     GlogAppender.Install(TLogLevel.values()[impalaLogLevel],
         TLogLevel.values()[otherLogLevel]);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git 
a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index 18043c2..a2b1fb9 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -17,7 +17,6 @@
 
 package org.apache.impala.service;
 
-import java.lang.NumberFormatException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -25,9 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Table;
@@ -39,8 +35,8 @@ import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.TDistributeParam;
 import org.apache.impala.thrift.TRangePartition;
 import org.apache.impala.util.KuduUtil;
-import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.CreateTableOptions;
 import org.apache.kudu.client.KuduClient;
@@ -48,6 +44,9 @@ import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RangePartitionBound;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
 /**
  * This is a helper for the CatalogOpExecutor to provide Kudu related DDL 
functionality
  * such as creating and dropping tables from Kudu.
@@ -67,7 +66,7 @@ public class KuduCatalogOpExecutor {
     String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
     LOG.debug(String.format("Creating table '%s' in master '%s'", 
kuduTableName,
         masterHosts));
-    try (KuduClient kudu = new 
KuduClient.KuduClientBuilder(masterHosts).build()) {
+    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
       // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure 
atomicity.
       // (see KUDU-1710).
       if (kudu.tableExists(kuduTableName)) {
@@ -79,7 +78,7 @@ public class KuduCatalogOpExecutor {
       CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
       kudu.createTable(kuduTableName, schema, tableOpts);
     } catch (Exception e) {
-      throw new ImpalaRuntimeException(String.format("Error creating table 
'%s'",
+      throw new ImpalaRuntimeException(String.format("Error creating Kudu 
table '%s'",
           kuduTableName), e);
     }
   }
@@ -158,14 +157,16 @@ public class KuduCatalogOpExecutor {
     // Set the number of table replicas, if specified.
     String replication = 
msTbl.getParameters().get(KuduTable.KEY_TABLET_REPLICAS);
     if (!Strings.isNullOrEmpty(replication)) {
+      int parsedReplicas = -1;
       try {
-        int r = Integer.parseInt(replication);
-        Preconditions.checkState(r > 0);
-        tableOpts.setNumReplicas(r);
-      } catch (NumberFormatException e) {
+        parsedReplicas = Integer.parseInt(replication);
+        Preconditions.checkState(parsedReplicas > 0,
+            "Invalid number of replicas table property:" + replication);
+      } catch (Exception e) {
         throw new ImpalaRuntimeException(String.format("Invalid number of 
table " +
-            "replicas specified: '%s'", replication), e);
+            "replicas specified: '%s'", replication));
       }
+      tableOpts.setNumReplicas(parsedReplicas);
     }
     return tableOpts;
   }
@@ -182,7 +183,7 @@ public class KuduCatalogOpExecutor {
     String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
     LOG.debug(String.format("Dropping table '%s' from master '%s'", tableName,
         masterHosts));
-    try (KuduClient kudu = new 
KuduClient.KuduClientBuilder(masterHosts).build()) {
+    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
       Preconditions.checkState(!Strings.isNullOrEmpty(tableName));
       // TODO: The IF EXISTS case should be handled by Kudu to ensure 
atomicity.
       // (see KUDU-1710).
@@ -211,7 +212,7 @@ public class KuduCatalogOpExecutor {
     String masterHosts = 
msTblCopy.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
     LOG.debug(String.format("Loading schema of table '%s' from master '%s'",
         kuduTableName, masterHosts));
-    try (KuduClient kudu = new 
KuduClient.KuduClientBuilder(masterHosts).build()) {
+    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
       if (!kudu.tableExists(kuduTableName)) {
         throw new ImpalaRuntimeException(String.format("Table does not exist 
in Kudu: " +
             "'%s'", kuduTableName));
@@ -247,9 +248,10 @@ public class KuduCatalogOpExecutor {
     Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts));
     String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME);
     Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
-    try (KuduClient kudu = new 
KuduClient.KuduClientBuilder(masterHosts).build()) {
+    try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) {
       kudu.tableExists(kuduTableName);
     } catch (Exception e) {
+      // TODO: This is misleading when there are other errors, e.g. timeouts.
       throw new ImpalaRuntimeException(String.format("Kudu table '%s' does not 
exist " +
           "on master '%s'", kuduTableName, masterHosts), e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java 
b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index 9ebc480..c50bf25 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -17,33 +17,49 @@
 
 package org.apache.impala.util;
 
+import static java.lang.String.format;
+
 import java.util.HashSet;
 import java.util.List;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TExprNode;
-
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
+import org.apache.kudu.client.KuduClient;
+import org.apache.kudu.client.KuduClient.KuduClientBuilder;
 import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RangePartitionBound;
 
-import static java.lang.String.format;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class KuduUtil {
 
   private static final String KUDU_TABLE_NAME_PREFIX = "impala::";
 
   /**
+   * Creates a KuduClient with the specified Kudu master addresses (as a 
comma-separated
+   * list of host:port pairs). The 'admin operation timeout' and the 
'operation timeout'
+   * are set to BackendConfig.getKuduClientTimeoutMs(). The 'admin operations 
timeout' is
+   * used for operations like creating/deleting tables. The 'operation 
timeout' is used
+   * when fetching tablet metadata.
+   */
+  public static KuduClient createKuduClient(String kuduMasters) {
+    KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters);
+    b.defaultAdminOperationTimeoutMs(BackendConfig.getKuduClientTimeoutMs());
+    b.defaultOperationTimeoutMs(BackendConfig.getKuduClientTimeoutMs());
+    return b.build();
+  }
+
+  /**
    * Creates a PartialRow from a list of range partition boundary values.
    */
   private static PartialRow parseRangePartitionBoundaryValues(Schema schema,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
 
b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
new file mode 100644
index 0000000..253c139
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test
@@ -0,0 +1,25 @@
+====
+---- QUERY
+# TODO: improve error messages (here and below) when KUDU-1734 is resolved
+describe functional_kudu.alltypes
+---- CATCH
+Error opening Kudu table 'impala::functional_kudu.alltypes'
+====
+---- QUERY
+show create table functional_kudu.alltypes
+---- CATCH
+Error opening Kudu table 'impala::functional_kudu.alltypes'
+====
+---- QUERY
+create table test_kudu (x int primary key)
+distribute by hash(x) into 3 buckets stored as kudu
+---- CATCH
+Error creating Kudu table
+====
+---- QUERY
+create external table test_kudu
+stored as kudu
+tblproperties('kudu.table_name'='doesnt_matter_this_times_out')
+---- CATCH
+Error loading schema of table 'doesnt_matter_this_times_out'
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
 
b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
new file mode 100644
index 0000000..ba3341e
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
@@ -0,0 +1,13 @@
+====
+---- QUERY
+# Expected timeout while planning the scan node.
+# TODO: improve error messages (here and below) when KUDU-1734 is resolved
+select * from functional_kudu.alltypes
+---- CATCH
+Unable to initialize the Kudu scan node
+====
+---- QUERY
+show table stats functional_kudu.alltypes
+---- CATCH
+Error accessing Kudu for table stats
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py 
b/tests/common/impala_test_suite.py
index 46b7eee..afa8e2c 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -211,7 +211,8 @@ class ImpalaTestSuite(BaseTestSuite):
       # Strip newlines so we can split error message into multiple lines
       expected_str = expected_str.replace('\n', '')
       if expected_str in actual_str: return
-    assert False, 'Unexpected exception string: %s' % actual_str
+    assert False, 'Unexpected exception string. Expected: %s\nNot found in 
actual: %s' % \
+      (expected_str, actual_str)
 
   def __verify_results_and_errors(self, vector, test_section, result, use_db):
     """Verifies that both results and error sections are as expected. Rewrites 
both

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/tests/custom_cluster/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_kudu.py 
b/tests/custom_cluster/test_kudu.py
index 898a29e..bd1584b 100644
--- a/tests/custom_cluster/test_kudu.py
+++ b/tests/custom_cluster/test_kudu.py
@@ -51,3 +51,27 @@ class TestKuduOperations(CustomClusterTestSuite, 
KuduTestSuite):
           'kudu.table_name'='%s')
           """ % (table_name, KUDU_MASTER_HOSTS, kudu_table.name))
       cursor.execute("DROP TABLE %s" % table_name)
+
+
+class TestKuduClientTimeout(CustomClusterTestSuite, KuduTestSuite):
+  """Kudu tests that set the Kudu client operation timeout to 1ms and expect
+     specific timeout exceptions. While we expect all exercised operations to 
take at
+     least 1ms, it is possible that some may not and thus the test could be 
flaky. If
+     this turns out to be the case, specific tests may need to be 
re-considered or
+     removed."""
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  
@CustomClusterTestSuite.with_args(impalad_args="-kudu_operation_timeout_ms=1")
+  def test_impalad_timeout(self, vector):
+    """Check impalad behavior when -kudu_operation_timeout_ms is too low."""
+    self.run_test_case('QueryTest/kudu-timeouts-impalad', vector)
+
+  @pytest.mark.execute_serially
+  
@CustomClusterTestSuite.with_args(catalogd_args="-kudu_operation_timeout_ms=1")
+  def test_catalogd_timeout(self, vector):
+    """Check catalogd behavior when -kudu_operation_timeout_ms is too low."""
+    self.run_test_case('QueryTest/kudu-timeouts-catalogd', vector)


Reply via email to