Repository: incubator-impala Updated Branches: refs/heads/hadoop-next eae4f307d -> 32dfcebfa
IMPALA-3771: Expose kudu client timeout and set default The Kudu client timeout was too low for Impala usage. This sets the default timeout to 3 minutes and exposes it as a gflag. New timeout tests were added. Change-Id: Iad95e8e38aad4f76d21bac6879db6c02b3c3e045 Reviewed-on: http://gerrit.cloudera.org:8080/4849 Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/50f7753d Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/50f7753d Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/50f7753d Branch: refs/heads/hadoop-next Commit: 50f7753d2bf3bde0ea67319b680806a071d69871 Parents: 3e18755 Author: Matthew Jacobs <[email protected]> Authored: Wed Oct 26 17:25:30 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Sat Nov 5 06:43:45 2016 +0000 ---------------------------------------------------------------------- be/src/catalog/catalog.cc | 6 ++- be/src/common/global-flags.cc | 8 ++++ be/src/exec/kudu-scan-node.cc | 7 +--- be/src/exec/kudu-scan-node.h | 4 +- be/src/exec/kudu-scanner.cc | 5 +-- be/src/exec/kudu-table-sink.cc | 14 ++----- be/src/exec/kudu-table-sink.h | 8 ++-- be/src/exec/kudu-util.cc | 11 +++++ be/src/exec/kudu-util.h | 5 +++ be/src/service/frontend.cc | 7 +++- .../org/apache/impala/catalog/KuduTable.java | 43 +++++++++----------- .../org/apache/impala/planner/KuduScanNode.java | 5 +-- .../apache/impala/service/BackendConfig.java | 18 +++++++- .../org/apache/impala/service/JniCatalog.java | 18 ++++---- .../org/apache/impala/service/JniFrontend.java | 4 +- .../impala/service/KuduCatalogOpExecutor.java | 32 ++++++++------- .../java/org/apache/impala/util/KuduUtil.java | 30 ++++++++++---- .../QueryTest/kudu-timeouts-catalogd.test | 25 ++++++++++++ .../QueryTest/kudu-timeouts-impalad.test | 13 ++++++ tests/common/impala_test_suite.py | 3 +- tests/custom_cluster/test_kudu.py | 24 +++++++++++ 21 files changed, 199 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/catalog/catalog.cc ---------------------------------------------------------------------- diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc index 9e7af4e..7d6fdca 100644 --- a/be/src/catalog/catalog.cc +++ b/be/src/catalog/catalog.cc @@ -42,10 +42,11 @@ DEFINE_string(sentry_config, "", "Local path to a sentry-site.xml configuration "file. If set, authorization will be enabled."); DECLARE_int32(non_impala_java_vlog); +DECLARE_int32(kudu_operation_timeout_ms); Catalog::Catalog() { JniMethodDescriptor methods[] = { - {"<init>", "(ZILjava/lang/String;IIZLjava/lang/String;Ljava/lang/String;)V", + {"<init>", "(ZILjava/lang/String;IIZLjava/lang/String;Ljava/lang/String;I)V", &catalog_ctor_}, {"updateCatalog", "([B)[B", &update_metastore_id_}, {"execDdl", "([B)[B", &exec_ddl_id_}, @@ -77,10 +78,11 @@ Catalog::Catalog() { jboolean auth_to_local = FLAGS_load_auth_to_local_rules && !FLAGS_principal.empty(); jstring principal = jni_env->NewStringUTF(FLAGS_principal.c_str()); jstring local_library_dir = jni_env->NewStringUTF(FLAGS_local_library_dir.c_str()); + jint kudu_operation_timeout = FLAGS_kudu_operation_timeout_ms; jobject catalog = jni_env->NewObject(catalog_class_, catalog_ctor_, load_in_background, num_metadata_loading_threads, sentry_config, FlagToTLogLevel(FLAGS_v), FlagToTLogLevel(FLAGS_non_impala_java_vlog), - auth_to_local, principal, local_library_dir); + auth_to_local, principal, local_library_dir, kudu_operation_timeout); EXIT_IF_EXC(jni_env); ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, catalog, &catalog_)); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/common/global-flags.cc ---------------------------------------------------------------------- diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc index c67ec2c..f75236c 100644 --- a/be/src/common/global-flags.cc +++ b/be/src/common/global-flags.cc @@ -108,8 +108,16 @@ DEFINE_int32(fault_injection_rpc_type, 0, "A fault injection option that specifi "which rpc call will be injected with the delay. Effective in debug builds only."); #endif +// Used for testing the path where the Kudu client is stubbed. DEFINE_bool(disable_kudu, false, "If true, Kudu features will be disabled."); +// Timeout (ms) used in the FE for admin and metadata operations (set on the KuduClient), +// and in the BE for scans and writes (set on the KuduScanner and KuduSession +// accordingly). +DEFINE_int32(kudu_operation_timeout_ms, 3 * 60 * 1000, "Timeout (milliseconds) set for " + "all Kudu operations. This must be a positive value, and there is no way to disable " + "timeouts."); + DEFINE_bool(enable_accept_queue_server, true, "If true, uses a modified version of " "TThreadedServer that accepts connections as quickly as possible and hands them off " http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index 1aef102..f98077e 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -121,12 +121,7 @@ Status KuduScanNode::Open(RuntimeState* state) { const KuduTableDescriptor* table_desc = static_cast<const KuduTableDescriptor*>(tuple_desc_->table_desc()); - kudu::client::KuduClientBuilder b; - for (const string& address: table_desc->kudu_master_addresses()) { - b.add_master_server_addr(address); - } - - KUDU_RETURN_IF_ERROR(b.Build(&client_), "Unable to create Kudu client"); + RETURN_IF_ERROR(CreateKuduClient(table_desc->kudu_master_addresses(), &client_)); uint64_t latest_ts = static_cast<uint64_t>( max<int64_t>(0, state->query_ctx().session.kudu_latest_observed_ts)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h index 79ed480..84e2531 100644 --- a/be/src/exec/kudu-scan-node.h +++ b/be/src/exec/kudu-scan-node.h @@ -65,8 +65,8 @@ class KuduScanNode : public ScanNode { const TupleDescriptor* tuple_desc_; /// The Kudu client and table. Scanners share these instances. - std::tr1::shared_ptr<kudu::client::KuduClient> client_; - std::tr1::shared_ptr<kudu::client::KuduTable> table_; + kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_; + kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_; /// Set of scan tokens to be deserialized into Kudu scanners. std::vector<std::string> scan_tokens_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc index d230985..c13b6a8 100644 --- a/be/src/exec/kudu-scanner.cc +++ b/be/src/exec/kudu-scanner.cc @@ -49,8 +49,7 @@ DEFINE_int32(kudu_scanner_keep_alive_period_sec, 15, "The period at which Kudu Scanners should send keep-alive requests to the tablet " "server to ensure that scanners do not time out."); -DEFINE_int32(kudu_scanner_timeout_sec, 60, - "The timeout used for Kudu Scan requests."); +DECLARE_int32(kudu_operation_timeout_ms); namespace impala { @@ -134,7 +133,7 @@ Status KuduScanner::OpenNextScanToken(const string& scan_token) { "Could not set replica selection."); } - KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_scanner_timeout_sec * 1000), + KUDU_RETURN_IF_ERROR(scanner_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms), "Could not set scanner timeout"); { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index fc26a53..14b1889 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -31,11 +31,11 @@ #include "common/names.h" -DEFINE_int32(kudu_session_timeout_seconds, 60, "Timeout set on the Kudu session. " - "How long to wait before considering a write failed."); DEFINE_int32(kudu_mutation_buffer_size, 100 * 1024 * 1024, "The size (bytes) of the " "Kudu client buffer for mutations."); +DECLARE_int32(kudu_operation_timeout_ms); + using kudu::client::KuduColumnSchema; using kudu::client::KuduSchema; using kudu::client::KuduClient; @@ -112,19 +112,13 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) { Status KuduTableSink::Open(RuntimeState* state) { RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state)); - - kudu::client::KuduClientBuilder b; - for (const string& address: table_desc_->kudu_master_addresses()) { - b.add_master_server_addr(address); - } - - KUDU_RETURN_IF_ERROR(b.Build(&client_), "Unable to create Kudu client"); + RETURN_IF_ERROR(CreateKuduClient(table_desc_->kudu_master_addresses(), &client_)); KUDU_RETURN_IF_ERROR(client_->OpenTable(table_desc_->table_name(), &table_), "Unable to open Kudu table"); session_ = client_->NewSession(); - session_->SetTimeoutMillis(FLAGS_kudu_session_timeout_seconds * 1000); + session_->SetTimeoutMillis(FLAGS_kudu_operation_timeout_ms); // KuduSession Set* methods here and below return a status for API compatibility. // As long as the Kudu client is statically linked, these shouldn't fail and thus these http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h index fe278c5..c37b15d 100644 --- a/be/src/exec/kudu-table-sink.h +++ b/be/src/exec/kudu-table-sink.h @@ -100,10 +100,10 @@ class KuduTableSink : public DataSink { std::vector<ExprContext*> output_expr_ctxs_; /// The Kudu client, table and session. - /// This uses 'std::tr1::shared_ptr' as that is the type expected by Kudu. - std::tr1::shared_ptr<kudu::client::KuduClient> client_; - std::tr1::shared_ptr<kudu::client::KuduTable> table_; - std::tr1::shared_ptr<kudu::client::KuduSession> session_; + /// This uses 'kudu::client::sp::shared_ptr' as that is the type expected by Kudu. + kudu::client::sp::shared_ptr<kudu::client::KuduClient> client_; + kudu::client::sp::shared_ptr<kudu::client::KuduTable> table_; + kudu::client::sp::shared_ptr<kudu::client::KuduSession> session_; /// Used to specify the type of write operation (INSERT/UPDATE/DELETE). TSinkAction::type sink_action_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-util.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc index aedf229..5afa0a4 100644 --- a/be/src/exec/kudu-util.cc +++ b/be/src/exec/kudu-util.cc @@ -28,9 +28,12 @@ #include "common/status.h" using kudu::client::KuduSchema; +using kudu::client::KuduClient; +using kudu::client::KuduClientBuilder; using kudu::client::KuduColumnSchema; DECLARE_bool(disable_kudu); +DECLARE_int32(kudu_operation_timeout_ms); namespace impala { @@ -54,6 +57,14 @@ Status CheckKuduAvailability() { return Status(TErrorCode::KUDU_NOT_SUPPORTED_ON_OS); } +Status CreateKuduClient(const vector<string>& master_addrs, + kudu::client::sp::shared_ptr<KuduClient>* client) { + kudu::client::KuduClientBuilder b; + for (const string& address: master_addrs) b.add_master_server_addr(address); + KUDU_RETURN_IF_ERROR(b.Build(client), "Unable to create Kudu client"); + return Status::OK(); +} + string KuduSchemaDebugString(const KuduSchema& schema) { stringstream ss; for (int i = 0; i < schema.num_columns(); ++i) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/exec/kudu-util.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h index 7957f7a..7774812 100644 --- a/be/src/exec/kudu-util.h +++ b/be/src/exec/kudu-util.h @@ -37,6 +37,11 @@ Status CheckKuduAvailability(); /// Convenience function for the bool equivalent of CheckKuduAvailability(). bool KuduIsAvailable(); +/// Creates a new KuduClient using the specified master adresses. If any error occurs, +/// 'client' is not set and an error status is returned. +Status CreateKuduClient(const std::vector<std::string>& master_addrs, + kudu::client::sp::shared_ptr<kudu::client::KuduClient>* client); + /// Returns a debug string for the KuduSchema. std::string KuduSchemaDebugString(const kudu::client::KuduSchema& schema); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/be/src/service/frontend.cc ---------------------------------------------------------------------- diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc index ca8ecbb..f8cecfb 100644 --- a/be/src/service/frontend.cc +++ b/be/src/service/frontend.cc @@ -33,6 +33,7 @@ DECLARE_string(sentry_config); DECLARE_int32(non_impala_java_vlog); DECLARE_bool(load_auth_to_local_rules); DECLARE_string(principal); +DECLARE_int32(kudu_operation_timeout_ms); DEFINE_bool(load_catalog_at_startup, false, "if true, load all catalog data at startup"); @@ -64,7 +65,7 @@ DEFINE_string(kudu_master_hosts, "", "Specifies the default Kudu master(s). The Frontend::Frontend() { JniMethodDescriptor methods[] = { {"<init>", "(ZLjava/lang/String;Ljava/lang/String;Ljava/lang/String;" - "Ljava/lang/String;IIZLjava/lang/String;)V", &fe_ctor_}, + "Ljava/lang/String;IIZLjava/lang/String;I)V", &fe_ctor_}, {"createExecRequest", "([B)[B", &create_exec_request_id_}, {"getExplainPlan", "([B)Ljava/lang/String;", &get_explain_plan_id_}, {"getHadoopConfig", "([B)[B", &get_hadoop_config_id_}, @@ -114,9 +115,11 @@ Frontend::Frontend() { // and impala is kerberized. jboolean auth_to_local = FLAGS_load_auth_to_local_rules && !FLAGS_principal.empty(); jstring kudu_master_hosts = jni_env->NewStringUTF(FLAGS_kudu_master_hosts.c_str()); + jint kudu_operation_timeout = FLAGS_kudu_operation_timeout_ms; jobject fe = jni_env->NewObject(fe_class_, fe_ctor_, lazy, server_name, policy_file_path, sentry_config, auth_provider_class, FlagToTLogLevel(FLAGS_v), - FlagToTLogLevel(FLAGS_non_impala_java_vlog), auth_to_local, kudu_master_hosts); + FlagToTLogLevel(FLAGS_non_impala_java_vlog), auth_to_local, kudu_master_hosts, + kudu_operation_timeout); EXIT_IF_EXC(jni_env); ABORT_IF_ERROR(JniUtil::LocalToGlobalRef(jni_env, fe, &fe_)); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/catalog/KuduTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java index a7a5933..6c30077 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java @@ -23,10 +23,15 @@ import java.util.Set; import javax.xml.bind.DatatypeConverter; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.impala.analysis.ColumnDef; import org.apache.impala.analysis.DistributeParam; -import org.apache.impala.analysis.ToSqlUtils; import org.apache.impala.common.ImpalaRuntimeException; +import org.apache.impala.service.BackendConfig; +import org.apache.impala.service.CatalogOpExecutor; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TDistributeByHashParam; @@ -40,27 +45,21 @@ import org.apache.impala.thrift.TTableDescriptor; import org.apache.impala.thrift.TTableType; import org.apache.impala.util.KuduUtil; import org.apache.impala.util.TResultRowBuilder; -import org.apache.impala.service.CatalogOpExecutor; - -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.kudu.ColumnSchema; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduException; import org.apache.kudu.client.LocatedTablet; +import org.apache.kudu.client.PartitionSchema; import org.apache.kudu.client.PartitionSchema.HashBucketSchema; import org.apache.kudu.client.PartitionSchema.RangeSchema; -import org.apache.kudu.client.PartitionSchema; import org.apache.log4j.Logger; import org.apache.thrift.TException; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + /** * Representation of a Kudu table in the catalog cache. */ @@ -94,8 +93,6 @@ public class KuduTable extends Table { // Key to specify the number of tablet replicas. public static final String KEY_TABLET_REPLICAS = "kudu.num_tablet_replicas"; - public static final long KUDU_RPC_TIMEOUT_MS = 50000; - // Table name in the Kudu storage engine. It may not neccessarily be the same as the // table name specified in the CREATE TABLE statement; the latter // is stored in Table.name_. Reasons why KuduTable.kuduTableName_ and Table.name_ may @@ -172,12 +169,12 @@ public class KuduTable extends Table { numRows_ = getRowCount(msTable_.getParameters()); // Connect to Kudu to retrieve table metadata - try (KuduClient kuduClient = new KuduClient.KuduClientBuilder( - getKuduMasterHosts()).build()) { + try (KuduClient kuduClient = KuduUtil.createKuduClient(getKuduMasterHosts())) { kuduTable = kuduClient.openTable(kuduTableName_); } catch (KuduException e) { - LOG.error("Error accessing Kudu table " + kuduTableName_); - throw new TableLoadingException(e.getMessage()); + throw new TableLoadingException(String.format( + "Error opening Kudu table '%s', Kudu error: %s", + kuduTableName_, e.getMessage())); } Preconditions.checkNotNull(kuduTable); @@ -187,7 +184,6 @@ public class KuduTable extends Table { loadDistributeByParams(kuduTable); loadAllColumnStats(msClient); } catch (ImpalaRuntimeException e) { - LOG.error("Error loading metadata for Kudu table: " + kuduTableName_); throw new TableLoadingException("Error loading metadata for Kudu table " + kuduTableName_, e); } @@ -341,11 +337,10 @@ public class KuduTable extends Table { resultSchema.addToColumns(new TColumn("Leader Replica", Type.STRING.toThrift())); resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift())); - try (KuduClient client = new KuduClient.KuduClientBuilder( - getKuduMasterHosts()).build()) { + try (KuduClient client = KuduUtil.createKuduClient(getKuduMasterHosts())) { org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_); List<LocatedTablet> tablets = - kuduTable.getTabletsLocations(KUDU_RPC_TIMEOUT_MS); + kuduTable.getTabletsLocations(BackendConfig.getKuduClientTimeoutMs()); for (LocatedTablet tab: tablets) { TResultRowBuilder builder = new TResultRowBuilder(); builder.add("-1"); // The Kudu client API doesn't expose tablet row counts. @@ -366,7 +361,7 @@ public class KuduTable extends Table { } } catch (Exception e) { - throw new ImpalaRuntimeException("Could not communicate with Kudu.", e); + throw new ImpalaRuntimeException("Error accessing Kudu for table stats.", e); } return result; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 7ef1c20..6f20abd 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -44,10 +44,10 @@ import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TScanRange; import org.apache.impala.thrift.TScanRangeLocation; import org.apache.impala.thrift.TScanRangeLocationList; +import org.apache.impala.util.KuduUtil; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.client.KuduClient; -import org.apache.kudu.client.KuduClient.KuduClientBuilder; import org.apache.kudu.client.KuduPredicate; import org.apache.kudu.client.KuduPredicate.ComparisonOp; import org.apache.kudu.client.KuduScanToken; @@ -106,8 +106,7 @@ public class KuduScanNode extends ScanNode { analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_); conjuncts_ = orderConjunctsByCost(conjuncts_); - try (KuduClient client = - new KuduClientBuilder(kuduTable_.getKuduMasterHosts()).build()) { + try (KuduClient client = KuduUtil.createKuduClient(kuduTable_.getKuduMasterHosts())) { org.apache.kudu.client.KuduTable rpcTable = client.openTable(kuduTable_.getKuduTableName()); validateSchema(rpcTable); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/service/BackendConfig.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index 75dd1f7..c90f674 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -17,6 +17,8 @@ package org.apache.impala.service; +import com.google.common.base.Preconditions; + /** * This class is meant to provide the FE with impalad backend configuration parameters, * including command line arguments. @@ -31,10 +33,15 @@ public class BackendConfig { // the default FLAGS_read_size used by the IO manager in the backend. private final long READ_SIZE; - // This is overriden by JniFrontend/JniCatalog classes with user set configuration. - // TODO: Read this from backend instead of using static variables. + // Variables below are overriden by JniFrontend/JniCatalog with user set configuration. + // TODO: Read from backend instead of using static variables. + + // Determines how principal to short name conversion works. See User.java for more info. private static boolean allowAuthToLocalRules_ = false; + // Kudu client timeout (ms). + private static int kuduOperationTimeoutMs_ = 3 * 60 * 1000; + private BackendConfig() { // TODO: Populate these by making calls to the backend instead of default constants. READ_SIZE = 8 * 1024 * 1024L; @@ -46,4 +53,11 @@ public class BackendConfig { public static void setAuthToLocal(boolean authToLocal) { allowAuthToLocalRules_ = authToLocal; } + + public static int getKuduClientTimeoutMs() { return kuduOperationTimeoutMs_; } + + public static void setKuduClientTimeoutMs(int kuduOperationTimeoutMs) { + Preconditions.checkArgument(kuduOperationTimeoutMs > 0); + BackendConfig.kuduOperationTimeoutMs_ = kuduOperationTimeoutMs; + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/service/JniCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java index b35877f..7993191 100644 --- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java +++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java @@ -21,12 +21,6 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.impala.authorization.SentryConfig; import org.apache.impala.authorization.User; import org.apache.impala.catalog.CatalogException; @@ -36,7 +30,6 @@ import org.apache.impala.catalog.Function; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; import org.apache.impala.common.JniUtil; -import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TDatabase; import org.apache.impala.thrift.TDdlExecRequest; @@ -56,6 +49,12 @@ import org.apache.impala.thrift.TUniqueId; import org.apache.impala.thrift.TUpdateCatalogRequest; import org.apache.impala.util.GlogAppender; import org.apache.impala.util.PatternMatcher; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -81,9 +80,10 @@ public class JniCatalog { public JniCatalog(boolean loadInBackground, int numMetadataLoadingThreads, String sentryServiceConfig, int impalaLogLevel, int otherLogLevel, - boolean allowAuthToLocal, String kerberosPrincipal, String localLibraryPath) - throws InternalException { + boolean allowAuthToLocal, String kerberosPrincipal, String localLibraryPath, + int kuduOperationTimeoutMs) throws InternalException { BackendConfig.setAuthToLocal(allowAuthToLocal); + BackendConfig.setKuduClientTimeoutMs(kuduOperationTimeoutMs); Preconditions.checkArgument(numMetadataLoadingThreads > 0); // This trick saves having to pass a TLogLevel enum, which is an object and more // complex to pass through JNI. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/service/JniFrontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java index 0d502e5..3ef89e3 100644 --- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java +++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java @@ -117,9 +117,11 @@ public class JniFrontend { */ public JniFrontend(boolean lazy, String serverName, String authorizationPolicyFile, String sentryConfigFile, String authPolicyProviderClass, int impalaLogLevel, - int otherLogLevel, boolean allowAuthToLocal, String defaultKuduMasterHosts) + int otherLogLevel, boolean allowAuthToLocal, String defaultKuduMasterHosts, + int kuduOperationTimeoutMs) throws InternalException { BackendConfig.setAuthToLocal(allowAuthToLocal); + BackendConfig.setKuduClientTimeoutMs(kuduOperationTimeoutMs); GlogAppender.Install(TLogLevel.values()[impalaLogLevel], TLogLevel.values()[otherLogLevel]); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java index 18043c2..a2b1fb9 100644 --- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java @@ -17,7 +17,6 @@ package org.apache.impala.service; -import java.lang.NumberFormatException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -25,9 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; - import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.Table; @@ -39,8 +35,8 @@ import org.apache.impala.thrift.TCreateTableParams; import org.apache.impala.thrift.TDistributeParam; import org.apache.impala.thrift.TRangePartition; import org.apache.impala.util.KuduUtil; -import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; import org.apache.kudu.Schema; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduClient; @@ -48,6 +44,9 @@ import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.RangePartitionBound; import org.apache.log4j.Logger; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + /** * This is a helper for the CatalogOpExecutor to provide Kudu related DDL functionality * such as creating and dropping tables from Kudu. @@ -67,7 +66,7 @@ public class KuduCatalogOpExecutor { String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS); LOG.debug(String.format("Creating table '%s' in master '%s'", kuduTableName, masterHosts)); - try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) { + try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity. // (see KUDU-1710). if (kudu.tableExists(kuduTableName)) { @@ -79,7 +78,7 @@ public class KuduCatalogOpExecutor { CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema); kudu.createTable(kuduTableName, schema, tableOpts); } catch (Exception e) { - throw new ImpalaRuntimeException(String.format("Error creating table '%s'", + throw new ImpalaRuntimeException(String.format("Error creating Kudu table '%s'", kuduTableName), e); } } @@ -158,14 +157,16 @@ public class KuduCatalogOpExecutor { // Set the number of table replicas, if specified. String replication = msTbl.getParameters().get(KuduTable.KEY_TABLET_REPLICAS); if (!Strings.isNullOrEmpty(replication)) { + int parsedReplicas = -1; try { - int r = Integer.parseInt(replication); - Preconditions.checkState(r > 0); - tableOpts.setNumReplicas(r); - } catch (NumberFormatException e) { + parsedReplicas = Integer.parseInt(replication); + Preconditions.checkState(parsedReplicas > 0, + "Invalid number of replicas table property:" + replication); + } catch (Exception e) { throw new ImpalaRuntimeException(String.format("Invalid number of table " + - "replicas specified: '%s'", replication), e); + "replicas specified: '%s'", replication)); } + tableOpts.setNumReplicas(parsedReplicas); } return tableOpts; } @@ -182,7 +183,7 @@ public class KuduCatalogOpExecutor { String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS); LOG.debug(String.format("Dropping table '%s' from master '%s'", tableName, masterHosts)); - try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) { + try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { Preconditions.checkState(!Strings.isNullOrEmpty(tableName)); // TODO: The IF EXISTS case should be handled by Kudu to ensure atomicity. // (see KUDU-1710). @@ -211,7 +212,7 @@ public class KuduCatalogOpExecutor { String masterHosts = msTblCopy.getParameters().get(KuduTable.KEY_MASTER_HOSTS); LOG.debug(String.format("Loading schema of table '%s' from master '%s'", kuduTableName, masterHosts)); - try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) { + try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { if (!kudu.tableExists(kuduTableName)) { throw new ImpalaRuntimeException(String.format("Table does not exist in Kudu: " + "'%s'", kuduTableName)); @@ -247,9 +248,10 @@ public class KuduCatalogOpExecutor { Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts)); String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME); Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName)); - try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) { + try (KuduClient kudu = KuduUtil.createKuduClient(masterHosts)) { kudu.tableExists(kuduTableName); } catch (Exception e) { + // TODO: This is misleading when there are other errors, e.g. timeouts. throw new ImpalaRuntimeException(String.format("Kudu table '%s' does not exist " + "on master '%s'", kuduTableName, masterHosts), e); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/fe/src/main/java/org/apache/impala/util/KuduUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java index 9ebc480..c50bf25 100644 --- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java +++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java @@ -17,33 +17,49 @@ package org.apache.impala.util; +import static java.lang.String.format; + import java.util.HashSet; import java.util.List; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.Type; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.Pair; +import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TExpr; import org.apache.impala.thrift.TExprNode; - import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduClient.KuduClientBuilder; import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.RangePartitionBound; -import static java.lang.String.format; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class KuduUtil { private static final String KUDU_TABLE_NAME_PREFIX = "impala::"; /** + * Creates a KuduClient with the specified Kudu master addresses (as a comma-separated + * list of host:port pairs). The 'admin operation timeout' and the 'operation timeout' + * are set to BackendConfig.getKuduClientTimeoutMs(). The 'admin operations timeout' is + * used for operations like creating/deleting tables. The 'operation timeout' is used + * when fetching tablet metadata. + */ + public static KuduClient createKuduClient(String kuduMasters) { + KuduClientBuilder b = new KuduClient.KuduClientBuilder(kuduMasters); + b.defaultAdminOperationTimeoutMs(BackendConfig.getKuduClientTimeoutMs()); + b.defaultOperationTimeoutMs(BackendConfig.getKuduClientTimeoutMs()); + return b.build(); + } + + /** * Creates a PartialRow from a list of range partition boundary values. */ private static PartialRow parseRangePartitionBoundaryValues(Schema schema, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test new file mode 100644 index 0000000..253c139 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-catalogd.test @@ -0,0 +1,25 @@ +==== +---- QUERY +# TODO: improve error messages (here and below) when KUDU-1734 is resolved +describe functional_kudu.alltypes +---- CATCH +Error opening Kudu table 'impala::functional_kudu.alltypes' +==== +---- QUERY +show create table functional_kudu.alltypes +---- CATCH +Error opening Kudu table 'impala::functional_kudu.alltypes' +==== +---- QUERY +create table test_kudu (x int primary key) +distribute by hash(x) into 3 buckets stored as kudu +---- CATCH +Error creating Kudu table +==== +---- QUERY +create external table test_kudu +stored as kudu +tblproperties('kudu.table_name'='doesnt_matter_this_times_out') +---- CATCH +Error loading schema of table 'doesnt_matter_this_times_out' +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test new file mode 100644 index 0000000..ba3341e --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test @@ -0,0 +1,13 @@ +==== +---- QUERY +# Expected timeout while planning the scan node. +# TODO: improve error messages (here and below) when KUDU-1734 is resolved +select * from functional_kudu.alltypes +---- CATCH +Unable to initialize the Kudu scan node +==== +---- QUERY +show table stats functional_kudu.alltypes +---- CATCH +Error accessing Kudu for table stats +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/tests/common/impala_test_suite.py ---------------------------------------------------------------------- diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 46b7eee..afa8e2c 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -211,7 +211,8 @@ class ImpalaTestSuite(BaseTestSuite): # Strip newlines so we can split error message into multiple lines expected_str = expected_str.replace('\n', '') if expected_str in actual_str: return - assert False, 'Unexpected exception string: %s' % actual_str + assert False, 'Unexpected exception string. Expected: %s\nNot found in actual: %s' % \ + (expected_str, actual_str) def __verify_results_and_errors(self, vector, test_section, result, use_db): """Verifies that both results and error sections are as expected. Rewrites both http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50f7753d/tests/custom_cluster/test_kudu.py ---------------------------------------------------------------------- diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py index 898a29e..bd1584b 100644 --- a/tests/custom_cluster/test_kudu.py +++ b/tests/custom_cluster/test_kudu.py @@ -51,3 +51,27 @@ class TestKuduOperations(CustomClusterTestSuite, KuduTestSuite): 'kudu.table_name'='%s') """ % (table_name, KUDU_MASTER_HOSTS, kudu_table.name)) cursor.execute("DROP TABLE %s" % table_name) + + +class TestKuduClientTimeout(CustomClusterTestSuite, KuduTestSuite): + """Kudu tests that set the Kudu client operation timeout to 1ms and expect + specific timeout exceptions. While we expect all exercised operations to take at + least 1ms, it is possible that some may not and thus the test could be flaky. If + this turns out to be the case, specific tests may need to be re-considered or + removed.""" + + @classmethod + def get_workload(cls): + return 'functional-query' + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(impalad_args="-kudu_operation_timeout_ms=1") + def test_impalad_timeout(self, vector): + """Check impalad behavior when -kudu_operation_timeout_ms is too low.""" + self.run_test_case('QueryTest/kudu-timeouts-impalad', vector) + + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args(catalogd_args="-kudu_operation_timeout_ms=1") + def test_catalogd_timeout(self, vector): + """Check catalogd behavior when -kudu_operation_timeout_ms is too low.""" + self.run_test_case('QueryTest/kudu-timeouts-catalogd', vector)
