This is an automated email from the ASF dual-hosted git repository. boroknagyz pushed a commit to branch branch-4.4.0 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 73f13f0e9f225400bb641c48da57a9871b5d8383 Author: Michael Smith <michael.sm...@cloudera.com> AuthorDate: Wed Apr 17 16:54:00 2024 -0700 IMPALA-13012: Lower default query_log_max_queued Sets the query_log_max_queued default such that query_log_max_queued * num_columns(49) < statement_expression_limit to avoid triggering e.g. AnalysisException: Exceeded the statement expression limit (250000) Statement has 370039 expressions. Also increases statement_expression_limit for insertion to avoid an error if query_log_max_queued is changed. Logs time taken to write to the queries table for help with debugging and adds histogram "impala-server.completed-queries.write-durations". Fixes InternalServer so it uses 'default_query_options'. Change-Id: I6535675307d88cb65ba7d908f3c692e0cf3259d7 Reviewed-on: http://gerrit.cloudera.org:8080/21351 Reviewed-by: Michael Smith <michael.sm...@cloudera.com> Tested-by: Michael Smith <michael.sm...@cloudera.com> Reviewed-by: Riza Suminto <riza.sumi...@cloudera.com> (cherry picked from commit ba32d70891fd68c5c1234ed543b74c51661bf272) --- be/src/service/impala-server.h | 7 ++-- be/src/service/internal-server-test.cc | 36 ++++++++---------- be/src/service/internal-server.cc | 24 +++++------- be/src/service/internal-server.h | 10 +++-- be/src/service/query-options.cc | 7 +++- be/src/service/query-options.h | 6 +++ be/src/service/workload-management-flags.cc | 2 +- be/src/service/workload-management.cc | 57 +++++++++++++++++++---------- be/src/util/impalad-metrics.cc | 6 +++ be/src/util/impalad-metrics.h | 4 ++ common/thrift/SystemTables.thrift | 2 + common/thrift/metrics.json | 12 +++++- tests/custom_cluster/test_query_log.py | 13 ++++--- 13 files changed, 115 insertions(+), 71 deletions(-) diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index e3562959d..b1ef8fa4d 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -410,18 +410,17 @@ class ImpalaServer : public ImpalaServiceIf, /// InternalServer methods, see internal-server.h for details virtual Status OpenSession(const std::string& user_name, TUniqueId& new_session_id, - const TQueryOptions& query_opts = TQueryOptions()); + const QueryOptionMap& query_opts = {}); virtual bool CloseSession(const impala::TUniqueId& session_id); virtual Status ExecuteIgnoreResults(const std::string& user_name, - const std::string& sql, const TQueryOptions& query_opts = TQueryOptions(), + const std::string& sql, const QueryOptionMap& query_opts = {}, const bool persist_in_db = true, TUniqueId* query_id = nullptr); virtual Status ExecuteAndFetchAllText(const std::string& user_name, const std::string& sql, query_results& results, results_columns* columns = nullptr, TUniqueId* query_id = nullptr); virtual Status SubmitAndWait(const std::string& user_name, const std::string& sql, TUniqueId& new_session_id, TUniqueId& new_query_id, - const TQueryOptions& query_opts = TQueryOptions(), - const bool persist_in_db = true); + const QueryOptionMap& query_opts = {}, const bool persist_in_db = true); virtual Status WaitForResults(TUniqueId& query_id); virtual Status SubmitQuery(const std::string& sql, const impala::TUniqueId& session_id, TUniqueId& new_query_id, const bool persist_in_db = true); diff --git a/be/src/service/internal-server-test.cc b/be/src/service/internal-server-test.cc index c1cb1ab79..ecc184283 100644 --- a/be/src/service/internal-server-test.cc +++ b/be/src/service/internal-server-test.cc @@ -130,7 +130,7 @@ class DatabaseTest { TUniqueId query_id; EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", StrCat("create database ", database_name_, " comment 'Temporary database created and managed by " - "internal-server-test'"), TQueryOptions(), false, &query_id)); + "internal-server-test'"), {}, false, &query_id)); assertQueryState(query_id, QUERY_STATE_SUCCESS); if (create_table) { @@ -138,7 +138,7 @@ class DatabaseTest { EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", StrCat("create table ", table_name_, "(id INT, name STRING, first_sold TIMESTAMP, " "last_sold TIMESTAMP, price DECIMAL(30, 2)) partitioned by (category INT)"), - TQueryOptions(), false, &query_id)); + {}, false, &query_id)); assertQueryState(query_id, QUERY_STATE_SUCCESS); // Insert some products that have a last_sold time. @@ -162,7 +162,7 @@ class DatabaseTest { } } - EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql1, TQueryOptions(), + EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql1, {}, false, &query_id)); assertQueryState(query_id, QUERY_STATE_SUCCESS); @@ -183,7 +183,7 @@ class DatabaseTest { } } - EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql2, TQueryOptions(), + EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql2, {}, false, &query_id)); assertQueryState(query_id, QUERY_STATE_SUCCESS); } @@ -221,13 +221,11 @@ TEST(InternalServerTest, QueryTimeout) { DatabaseTest db_test = DatabaseTest(impala_server_, "query_timeout", true, 5); InternalServer* fixture = impala_server_.get(); - TQueryOptions query_opts; - query_opts.__set_fetch_rows_timeout_ms(1); - TUniqueId session_id; TUniqueId query_id; - ASSERT_OK(fixture->OpenSession("impala", session_id, query_opts)); + ASSERT_OK(fixture->OpenSession("impala", session_id, + {{TImpalaQueryOptions::FETCH_ROWS_TIMEOUT_MS, "1"}})); // Run a query that will execute for longer than the configured exec timeout. ASSERT_OK(fixture->SubmitQuery(StrCat("select * from ", db_test.GetTableName(), @@ -249,12 +247,10 @@ TEST(InternalServerTest, QueryTimeout) { // Asserts the expected error is returned when a query option is set to an invalid value. TEST(InternalServerTest, InvalidQueryOption) { InternalServer* fixture = impala_server_.get(); - TQueryOptions query_opts; - - query_opts.__set_mem_limit_executors(-2); TUniqueId session_id; - Status stat = fixture->OpenSession("impala", session_id, query_opts); + Status stat = fixture->OpenSession("impala", session_id, + {{TImpalaQueryOptions::MEM_LIMIT_EXECUTORS, "-2"}}); ASSERT_FALSE(stat.ok()); ASSERT_EQ("Failed to parse query option 'MEM_LIMIT_EXECUTORS': -2", stat.msg().msg()); @@ -281,7 +277,7 @@ TEST(InternalServerTest, MultipleQueriesMultipleSessions) { // Insert a record into the test table using a new session. ASSERT_OK(fixture->ExecuteIgnoreResults("impala", StrCat("insert into ", test_table_name, "(id,first_name,last_name) VALUES (1,'test','person1')"), - TQueryOptions(), false, &query_id)); + {}, false, &query_id)); assertQueryState(query_id, QUERY_STATE_SUCCESS); // Select a record from the test table using a new session. @@ -317,10 +313,8 @@ TEST(InternalServerTest, RetryFailedQuery) { StrCat("IMPALA_SERVICE_POOL:127.0.0.1:",FLAGS_krpc_port, ":ExecQueryFInstances:FAIL")); - TQueryOptions query_opts; - query_opts.__set_retry_failed_queries(true); - - ASSERT_OK(fixture->OpenSession("impala", session_id, query_opts)); + ASSERT_OK(fixture->OpenSession("impala", session_id, + {{TImpalaQueryOptions::RETRY_FAILED_QUERIES, "true"}})); // Run a query that will fail and get automatically retried. ASSERT_OK(fixture->SubmitQuery("select 1", session_id, query_id)); @@ -415,7 +409,7 @@ TEST(InternalServerTest, MissingClosingQuote) { const string expected_msg = "ParseException: Unmatched string literal"; res = fixture->ExecuteIgnoreResults("impala",StrCat( "select * from ", - db_test.GetTableName(), " where name = 'foo"), TQueryOptions(), false, &query_id); + db_test.GetTableName(), " where name = 'foo"), {}, false, &query_id); EXPECT_EQ(TErrorCode::GENERAL, res.code()); EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length())); EXPECT_EQ(TUniqueId(), query_id); @@ -429,7 +423,7 @@ TEST(InternalServerTest, SyntaxError) { const string expected_msg = "ParseException: Syntax error in line 1"; res = fixture->ExecuteIgnoreResults("impala", StrCat("select * from ", - db_test.GetTableName(), "; select"), TQueryOptions(), false, &query_id); + db_test.GetTableName(), "; select"), {}, false, &query_id); EXPECT_EQ(TErrorCode::GENERAL, res.code()); EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length())); EXPECT_EQ(TUniqueId(), query_id); @@ -441,7 +435,7 @@ TEST(InternalServerTest, UnclosedComment) { Status res; const string expected_msg = "ParseException: Syntax error in line 1"; - res = fixture->ExecuteIgnoreResults("impala", "select 1 /*foo", TQueryOptions(), false, + res = fixture->ExecuteIgnoreResults("impala", "select 1 /*foo", {}, false, &query_id); EXPECT_EQ(TErrorCode::GENERAL, res.code()); EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length())); @@ -459,7 +453,7 @@ TEST(InternalServerTest, TableNotExist) { ASSERT_OK(fixture->ExecuteIgnoreResults("impala", StrCat("drop table ", db_test.GetTableName(), " purge"))); res = fixture->ExecuteIgnoreResults("impala", StrCat("select * from ", - db_test.GetTableName()), TQueryOptions(), false, &query_id); + db_test.GetTableName()), {}, false, &query_id); EXPECT_EQ(TErrorCode::GENERAL, res.code()); EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length())); EXPECT_EQ(TUniqueId(), query_id); diff --git a/be/src/service/internal-server.cc b/be/src/service/internal-server.cc index 829b79225..19c0ae5db 100644 --- a/be/src/service/internal-server.cc +++ b/be/src/service/internal-server.cc @@ -34,7 +34,7 @@ using namespace std; namespace impala { Status ImpalaServer::OpenSession(const string& user_name, TUniqueId& new_session_id, - const TQueryOptions& query_opts) { + const QueryOptionMap& query_opts) { shared_ptr<ThriftServer::ConnectionContext> conn_ctx = make_shared<ThriftServer::ConnectionContext>(); conn_ctx->connection_id = RandomUniqueID(); @@ -58,14 +58,11 @@ Status ImpalaServer::OpenSession(const string& user_name, TUniqueId& new_session { lock_guard<mutex> l(session_state_map_lock_); session_state = session_state_map_[new_session_id]; - std::map<string, string> query_opts_map; - TQueryOptionsToMap(query_opts, &query_opts_map); - for (auto iter=query_opts_map.cbegin(); iter!=query_opts_map.cend(); iter++) { - if (!iter->second.empty()) { - RETURN_IF_ERROR(SetQueryOption(iter->first, iter->second, + } + + for (const auto& iter : query_opts) { + RETURN_IF_ERROR(SetQueryOption(iter.first, iter.second, &session_state->set_query_options, &session_state->set_query_options_mask)); - } - } } MarkSessionActive(session_state); @@ -102,7 +99,7 @@ bool ImpalaServer::CloseSession(const TUniqueId& session_id) { } // ImpalaServer::CloseSession Status ImpalaServer::ExecuteIgnoreResults(const string& user_name, const string& sql, - const TQueryOptions& query_opts, const bool persist_in_db, TUniqueId* query_id) { + const QueryOptionMap& query_opts, const bool persist_in_db, TUniqueId* query_id) { TUniqueId session_id; TUniqueId internal_query_id; Status result; @@ -130,7 +127,7 @@ Status ImpalaServer::ExecuteAndFetchAllText(const std::string& user_name, TUniqueId internal_query_id; Status result; - result = SubmitAndWait(user_name, sql, session_id, internal_query_id, TQueryOptions()); + result = SubmitAndWait(user_name, sql, session_id, internal_query_id); if (query_id != nullptr) { *query_id = internal_query_id; @@ -150,7 +147,7 @@ Status ImpalaServer::ExecuteAndFetchAllText(const std::string& user_name, } // ImpalaServer::ExecuteAndFetchAllText Status ImpalaServer::SubmitAndWait(const string& user_name, const string& sql, - TUniqueId& new_session_id, TUniqueId& new_query_id, const TQueryOptions& query_opts, + TUniqueId& new_session_id, TUniqueId& new_query_id, const QueryOptionMap& query_opts, const bool persist_in_db) { RETURN_IF_ERROR(OpenSession(user_name, new_session_id, query_opts)); @@ -200,11 +197,8 @@ Status ImpalaServer::SubmitQuery(const string& sql, const TUniqueId& session_id, session_state->ToThrift(session_state->session_id, &query_context.session); - QueryOptionsMask set_query_options_mask; query_context.client_request.query_options = session_state->QueryOptions(); - set_query_options_mask = session_state->set_query_options_mask; - - AddPoolConfiguration(&query_context, ~set_query_options_mask); + AddPoolConfiguration(&query_context, ~session_state->set_query_options_mask); QueryHandle query_handle; RETURN_IF_ERROR(Execute(&query_context, session_state, &query_handle, nullptr, diff --git a/be/src/service/internal-server.h b/be/src/service/internal-server.h index aedaeb058..65e78bdee 100644 --- a/be/src/service/internal-server.h +++ b/be/src/service/internal-server.h @@ -17,6 +17,7 @@ #pragma once +#include <map> #include <memory> #include <string> #include <vector> @@ -59,6 +60,8 @@ namespace impala { public: virtual ~InternalServer() {} + using QueryOptionMap = std::map<TImpalaQueryOptions::type, string>; + /// Creates and registers a new connection and session. /// /// Parameters: @@ -72,7 +75,7 @@ namespace impala { /// Return: /// `impala::Status` Indicates the result of opening the new session. virtual Status OpenSession(const std::string& user_name, TUniqueId& new_session_id, - const TQueryOptions& query_opts = TQueryOptions()) = 0; + const QueryOptionMap& query_opts = {}) = 0; /// Closes a given session cleaning up all associated resources. /// @@ -105,7 +108,7 @@ namespace impala { /// `impala::Status` Indicates the result of submitting the query and waiting for /// it to return. virtual Status ExecuteIgnoreResults(const std::string& user_name, - const std::string& sql, const TQueryOptions& query_opts = TQueryOptions(), + const std::string& sql, const QueryOptionMap& query_opts = {}, const bool persist_in_db = true, TUniqueId* query_id = nullptr) = 0; /// Creates a new session under the specified user and submits a query under that @@ -162,8 +165,7 @@ namespace impala { /// `impala::Status` Indicates the result of submitting and waiting for the query. virtual Status SubmitAndWait(const std::string& user_name, const std::string& sql, TUniqueId& new_session_id, TUniqueId& new_query_id, - const TQueryOptions& query_opts = TQueryOptions(), - const bool persist_in_db = true) = 0; + const QueryOptionMap& query_opts = {}, const bool persist_in_db = true) = 0; /// Waits until the given query has results available. /// diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 10b4f3fbc..7f52e943c 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -231,9 +231,13 @@ Status impala::SetQueryOption(const string& key, const string& value, if (option_int < 0) { return Status(Substitute("Invalid query option: $0", key)); } + return SetQueryOption(static_cast<TImpalaQueryOptions::type>(option_int), + value, query_options, set_query_options_mask); +} +Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& value, + TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask) { QueryConstants qc; - TImpalaQueryOptions::type option = static_cast<TImpalaQueryOptions::type>(option_int); if (value.empty()) { ResetQueryOption(option, query_options); @@ -1286,6 +1290,7 @@ Status impala::SetQueryOption(const string& key, const string& value, break; } default: + string key = to_string(option); if (IsRemovedQueryOption(key)) { LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; return Status::OK(); diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 127f1ddb2..e9e23a38a 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -371,6 +371,12 @@ void OverlayQueryOptions(const TQueryOptions& src, const QueryOptionsMask& mask, Status SetQueryOption(const std::string& key, const std::string& value, TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask); +/// Set the key/value pair in TQueryOptions. It will override existing setting in +/// query_options. The bit corresponding to query option 'key' in set_query_options_mask +/// is set. An empty string value will reset the key to its default value. +Status SetQueryOption(TImpalaQueryOptions::type option, const std::string& value, + TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask); + /// Validates the query options after they have all been set. Returns a Status indicating /// the results of running the validation rules. The majority of the query options /// validation is done in SetQueryOption. However, more complex validations rules (e.g. diff --git a/be/src/service/workload-management-flags.cc b/be/src/service/workload-management-flags.cc index 81ec53bf1..b71bef2cc 100644 --- a/be/src/service/workload-management-flags.cc +++ b/be/src/service/workload-management-flags.cc @@ -79,7 +79,7 @@ DEFINE_int32_hidden(query_log_write_timeout_s, 0, "Specifies the query timeout i "seconds for inserts to the query log table. A value less than 1 indicates to use " "the same value as the query_log_write_interval_s flag."); -DEFINE_int32(query_log_max_queued, 10000, "Maximum number of records that can be queued " +DEFINE_int32(query_log_max_queued, 5000, "Maximum number of records that can be queued " "before they are written to the impala query log table. This flag operates " "independently of the 'query_log_write_interval_s' flag. If the number of queued " "records reaches this value, the records will be written to the query log table no " diff --git a/be/src/service/workload-management.cc b/be/src/service/workload-management.cc index ca570e787..cf88365d3 100644 --- a/be/src/service/workload-management.cc +++ b/be/src/service/workload-management.cc @@ -45,8 +45,11 @@ #include "service/internal-server.h" #include "service/query-state-record.h" #include "util/debug-util.h" +#include "util/histogram-metric.h" #include "util/impalad-metrics.h" #include "util/metrics.h" +#include "util/pretty-printer.h" +#include "util/stopwatch.h" #include "util/string-util.h" #include "util/thread.h" #include "util/ticker.h" @@ -78,7 +81,7 @@ static const string DB = "sys"; /// Default query options that will be provided on all queries that insert rows into the /// completed queries table. See the initialization code in the /// ImpalaServer::CompletedQueriesThread function for details on which options are set. -static TQueryOptions insert_query_opts; +static InternalServer::QueryOptionMap insert_query_opts; /// Non-values portion of the sql DML to insert records into the completed queries table. /// Generates the first portion of the DML that inserts records into the completed queries @@ -98,11 +101,11 @@ static inline bool MaxRecordsExceeded(size_t record_count) noexcept { /// Sets up the sys database generating and executing the necessary DML statements. static const Status SetupDb(InternalServer* server) { - insert_query_opts.__set_sync_ddl(true); + insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true"; RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user, StrCat("CREATE DATABASE IF NOT EXISTS ", DB, " COMMENT " "'System database for Impala introspection'"), insert_query_opts, false)); - insert_query_opts.__set_sync_ddl(false); + insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false"; return Status::OK(); } // function SetupDb @@ -116,7 +119,7 @@ static string GetColumnName(const FieldDefinition& field) { /// Sets up the query table by generating and executing the necessary DML statements. static const Status SetupTable(InternalServer* server, const string& table_name, bool is_system_table = false) { - insert_query_opts.__set_sync_ddl(true); + insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true"; StringStreamPop create_table_sql; create_table_sql << "CREATE TABLE IF NOT EXISTS " << table_name << "("; @@ -157,7 +160,7 @@ static const Status SetupTable(InternalServer* server, const string& table_name, RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user, create_table_sql.str(), insert_query_opts, false)); - insert_query_opts.__set_sync_ddl(false); + insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false"; LOG(INFO) << "Completed " << table_name << " initialization. write_interval=\"" << FLAGS_query_log_write_interval_s << "s\""; @@ -320,11 +323,12 @@ void ImpalaServer::CompletedQueriesThread() { } // Setup default query options. - insert_query_opts.__set_timezone("UTC"); - insert_query_opts.__set_query_timeout_s((FLAGS_query_log_write_timeout_s < 1 ? - FLAGS_query_log_write_interval_s : FLAGS_query_log_write_timeout_s)); + insert_query_opts[TImpalaQueryOptions::TIMEZONE] = "UTC"; + insert_query_opts[TImpalaQueryOptions::QUERY_TIMEOUT_S] = std::to_string( + FLAGS_query_log_write_timeout_s < 1 ? + FLAGS_query_log_write_interval_s : FLAGS_query_log_write_timeout_s); if (!FLAGS_query_log_request_pool.empty()) { - insert_query_opts.__set_request_pool(FLAGS_query_log_request_pool); + insert_query_opts[TImpalaQueryOptions::REQUEST_POOL] = FLAGS_query_log_request_pool; } // Fully qualified table name based on startup flags. @@ -388,9 +392,6 @@ void ImpalaServer::CompletedQueriesThread() { }); completed_queries_ticker_->ResetWakeupGuard(); - // transfer all currently queued completed queries to another list for processing - // so that the completed queries queue is not blocked while creating and executing the - // DML to insert into the query log table if (!completed_queries_.empty()) { if (MaxRecordsExceeded(completed_queries_.size())) { ImpaladMetrics::COMPLETED_QUERIES_MAX_RECORDS_WRITES->Increment(1L); @@ -398,6 +399,9 @@ void ImpalaServer::CompletedQueriesThread() { ImpaladMetrics::COMPLETED_QUERIES_SCHEDULED_WRITES->Increment(1L); } + MonotonicStopWatch timer; + timer.Start(); + // Copy all completed queries to a temporary list so that inserts to the // completed_queries list are not blocked while generating and running an insert // SQL statement for the completed queries. @@ -440,10 +444,11 @@ void ImpalaServer::CompletedQueriesThread() { sql.pop_back(); const size_t final_sql_len = _insert_dml.size() + sql.size(); + uint64_t gather_time = timer.Reset(); TUniqueId tmp_query_id; // Build query options to ensure the query is not rejected. - TQueryOptions opts = insert_query_opts; + InternalServer::QueryOptionMap opts = insert_query_opts; if (UNLIKELY(final_sql_len > numeric_limits<int32_t>::max())) { LOG(ERROR) << "Completed queries table insert sql statement of length '" << @@ -452,26 +457,40 @@ void ImpalaServer::CompletedQueriesThread() { continue; // NOTE: early loop continuation } - opts.__set_max_statement_length_bytes(final_sql_len < 1024 ? 1024 : - final_sql_len); - opts.__set_max_row_size(max_row_size); + // Set max_statement_length_bytes based on actual query, and at least the minimum. + opts[TImpalaQueryOptions::MAX_STATEMENT_LENGTH_BYTES] = std::to_string( + max<size_t>(MIN_MAX_STATEMENT_LENGTH_BYTES, final_sql_len)); + // Set statement_expression_limit based on actual query, and at least the minimum. + opts[TImpalaQueryOptions::STATEMENT_EXPRESSION_LIMIT] = std::to_string( + max<size_t>(MIN_STATEMENT_EXPRESSION_LIMIT, + queries_to_insert.size() * _TQueryTableColumn_VALUES_TO_NAMES.size())); + opts[TImpalaQueryOptions::MAX_ROW_SIZE] = std::to_string(max_row_size); // Execute the insert dml. const Status ret_status = internal_server_->ExecuteIgnoreResults( FLAGS_workload_mgmt_user, StrCat(_insert_dml, sql), opts, false, &tmp_query_id); + uint64_t exec_time = timer.ElapsedTime(); + ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS->Update( + gather_time + exec_time); if (ret_status.ok()) { LOG(INFO) << "wrote completed queries table=\"" << log_table_name << "\" " - "record_count=\"" << queries_to_insert.size() << "\""; + "record_count=" << queries_to_insert.size() << " " + "bytes=" << PrettyPrinter::PrintBytes(sql.size()) << " " + "gather_time=" << PrettyPrinter::Print(gather_time, TUnit::TIME_NS) << " " + "exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS); ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment( queries_to_insert.size() * -1); DCHECK(ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() >= 0); ImpaladMetrics::COMPLETED_QUERIES_WRITTEN->Increment( queries_to_insert.size()); } else { - LOG(WARNING) << "failed to write completed queries table=\"" << - log_table_name << "\" record_count=\"" << queries_to_insert.size() << "\""; + LOG(WARNING) << "failed to write completed queries table=\"" << log_table_name + << "\" record_count=" << queries_to_insert.size() << " " + "bytes=" << PrettyPrinter::PrintBytes(sql.size()) << " " + "gather_time=" << PrettyPrinter::Print(gather_time, TUnit::TIME_NS) << " " + "exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS); LOG(WARNING) << ret_status.GetDetail(); ImpaladMetrics::COMPLETED_QUERIES_FAIL->Increment(queries_to_insert.size()); completed_queries_lock_.lock(); diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc index 2079755a4..2e85d76b2 100644 --- a/be/src/util/impalad-metrics.cc +++ b/be/src/util/impalad-metrics.cc @@ -176,6 +176,8 @@ const char* ImpaladMetricKeys::COMPLETED_QUERIES_SCHEDULED_WRITES = "impala-server.completed-queries.scheduled-writes"; const char* ImpaladMetricKeys::COMPLETED_QUERIES_MAX_RECORDS_WRITES = "impala-server.completed-queries.max-records-writes"; +const char* ImpaladMetricKeys::COMPLETED_QUERIES_WRITE_DURATIONS = + "impala-server.completed-queries.write-durations"; const char* ImpaladMetricKeys::DEBUG_ACTION_NUM_FAIL = "impala.debug_action.fail"; const char* ImpaladMetricKeys::QUERY_LOG_EST_TOTAL_BYTES = "impala-server.query-log-est-total-bytes"; @@ -272,6 +274,7 @@ StringProperty* ImpaladMetrics::ACTIVE_CATALOGD_ADDRESS = nullptr; // Histograms HistogramMetric* ImpaladMetrics::QUERY_DURATIONS = nullptr; HistogramMetric* ImpaladMetrics::DDL_DURATIONS = nullptr; +HistogramMetric* ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS = nullptr; // Other StatsMetric<uint64_t, StatsType::MEAN>* @@ -459,6 +462,9 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) { MetricDefs::Get(ImpaladMetricKeys::QUERY_DURATIONS), FIVE_HOURS_IN_MS, 3)); DDL_DURATIONS = m->RegisterMetric(new HistogramMetric( MetricDefs::Get(ImpaladMetricKeys::DDL_DURATIONS), FIVE_HOURS_IN_MS, 3)); + COMPLETED_QUERIES_WRITE_DURATIONS = m->RegisterMetric(new HistogramMetric( + MetricDefs::Get(ImpaladMetricKeys::COMPLETED_QUERIES_WRITE_DURATIONS), + FIVE_HOURS_IN_MS, 3)); // Initialize Hedged read metrics HEDGED_READ_OPS = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS, 0); diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h index c30ff0e53..2dcf2c568 100644 --- a/be/src/util/impalad-metrics.h +++ b/be/src/util/impalad-metrics.h @@ -284,6 +284,9 @@ class ImpaladMetricKeys { /// Number of writes to the query log table that happened because the max queued /// completed queries records was reached. static const char* COMPLETED_QUERIES_MAX_RECORDS_WRITES; + + /// Time spent writing completed queries to the query log table. + static const char* COMPLETED_QUERIES_WRITE_DURATIONS; }; /// Global impalad-wide metrics. This is useful for objects that want to update metrics @@ -383,6 +386,7 @@ class ImpaladMetrics { // Histograms static HistogramMetric* QUERY_DURATIONS; static HistogramMetric* DDL_DURATIONS; + static HistogramMetric* COMPLETED_QUERIES_WRITE_DURATIONS; // Other static StatsMetric<uint64_t, StatsType::MEAN>* IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO; diff --git a/common/thrift/SystemTables.thrift b/common/thrift/SystemTables.thrift index a67282609..a66e27cc5 100644 --- a/common/thrift/SystemTables.thrift +++ b/common/thrift/SystemTables.thrift @@ -20,6 +20,8 @@ namespace java org.apache.impala.thrift # Must be kept in-sync with workload-management-fields.cc # Used as column names, so do not change existing enums. +# When adding new columns, review the default for query_log_max_queued to maintain +# query_log_max_queued * len(TQueryTableColumn) < statement_expression_limit(250k) enum TQueryTableColumn { CLUSTER_ID QUERY_ID diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json index 5aee94d49..d77415dfc 100644 --- a/common/thrift/metrics.json +++ b/common/thrift/metrics.json @@ -3899,9 +3899,19 @@ "contexts": [ "IMPALAD" ], - "label": "Max Records Hit", + "label": "Completed Queries Max Records Hit", "units": "NONE", "kind": "COUNTER", "key": "impala-server.completed-queries.max-records-writes" + }, + { + "description": "Time spent writing completed queries to the query log table.", + "contexts": [ + "IMPALAD" + ], + "label": "Completed Queries Write Duration Distribution", + "units": "TIME_NS", + "kind": "HISTOGRAM", + "key": "impala-server.completed-queries.write-durations" } ] diff --git a/tests/custom_cluster/test_query_log.py b/tests/custom_cluster/test_query_log.py index 9c393f60e..a81f92ca5 100644 --- a/tests/custom_cluster/test_query_log.py +++ b/tests/custom_cluster/test_query_log.py @@ -18,7 +18,6 @@ from __future__ import absolute_import, division, print_function import os -import pytest import string import tempfile @@ -94,7 +93,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): MAX_SQL_PLAN_LEN = 2000 LOG_DIR_MAX_WRITES = tempfile.mkdtemp(prefix="max_writes") FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_max_records_" + str(int(time())) - FLUSH_MAX_RECORDS_QUERY_COUNT = 2 + FLUSH_MAX_RECORDS_QUERY_COUNT = 30 OTHER_TBL = "completed_queries_table_{0}".format(int(time())) @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " @@ -310,10 +309,13 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): .format(FLUSH_MAX_RECORDS_QUERY_COUNT, FLUSH_MAX_RECORDS_CLUSTER_ID), catalogd_args="--enable_workload_mgmt", + default_query_options=[ + ('statement_expression_limit', 1024)], impalad_graceful_shutdown=True) def test_query_log_flush_max_records(self, vector): """Asserts that queries that have completed are written to the query log table when - the maximum number of queued records it reached.""" + the maximum number of queued records it reached. Also verifies that writing + completed queries is not limited by default statement_expression_limit.""" impalad = self.cluster.get_first_impalad() client = self.get_client(vector.get_value('protocol')) @@ -341,7 +343,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): impalad.service.wait_for_metric_value( "impala-server.completed-queries.max-records-writes", 1, 60) self.cluster.get_first_impalad().service.wait_for_metric_value( - "impala-server.completed-queries.written", 3, 60) + "impala-server.completed-queries.written", + self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1, 60) # Force Impala to process the inserts to the completed queries table. client.execute("refresh " + self.QUERY_TBL) @@ -352,7 +355,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase): .format(self.QUERY_TBL, rand_str)) assert res.success assert 1 == len(res.data) - assert "3" == res.data[0] + assert str(self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1) == res.data[0] impalad.service.wait_for_metric_value( "impala-server.completed-queries.queued", 2, 60)