This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch branch-4.4.0
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 73f13f0e9f225400bb641c48da57a9871b5d8383
Author: Michael Smith <michael.sm...@cloudera.com>
AuthorDate: Wed Apr 17 16:54:00 2024 -0700

    IMPALA-13012: Lower default query_log_max_queued
    
    Sets the query_log_max_queued default such that
    
      query_log_max_queued * num_columns(49) < statement_expression_limit
    
    to avoid triggering e.g.
    
      AnalysisException: Exceeded the statement expression limit (250000)
      Statement has 370039 expressions.
    
    Also increases statement_expression_limit for insertion to avoid an
    error if query_log_max_queued is changed.
    
    Logs time taken to write to the queries table for help with debugging
    and adds histogram "impala-server.completed-queries.write-durations".
    
    Fixes InternalServer so it uses 'default_query_options'.
    
    Change-Id: I6535675307d88cb65ba7d908f3c692e0cf3259d7
    Reviewed-on: http://gerrit.cloudera.org:8080/21351
    Reviewed-by: Michael Smith <michael.sm...@cloudera.com>
    Tested-by: Michael Smith <michael.sm...@cloudera.com>
    Reviewed-by: Riza Suminto <riza.sumi...@cloudera.com>
    (cherry picked from commit ba32d70891fd68c5c1234ed543b74c51661bf272)
---
 be/src/service/impala-server.h              |  7 ++--
 be/src/service/internal-server-test.cc      | 36 ++++++++----------
 be/src/service/internal-server.cc           | 24 +++++-------
 be/src/service/internal-server.h            | 10 +++--
 be/src/service/query-options.cc             |  7 +++-
 be/src/service/query-options.h              |  6 +++
 be/src/service/workload-management-flags.cc |  2 +-
 be/src/service/workload-management.cc       | 57 +++++++++++++++++++----------
 be/src/util/impalad-metrics.cc              |  6 +++
 be/src/util/impalad-metrics.h               |  4 ++
 common/thrift/SystemTables.thrift           |  2 +
 common/thrift/metrics.json                  | 12 +++++-
 tests/custom_cluster/test_query_log.py      | 13 ++++---
 13 files changed, 115 insertions(+), 71 deletions(-)

diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index e3562959d..b1ef8fa4d 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -410,18 +410,17 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// InternalServer methods, see internal-server.h for details
   virtual Status OpenSession(const std::string& user_name, TUniqueId& 
new_session_id,
-      const TQueryOptions& query_opts = TQueryOptions());
+      const QueryOptionMap& query_opts = {});
   virtual bool CloseSession(const impala::TUniqueId& session_id);
   virtual Status ExecuteIgnoreResults(const std::string& user_name,
-      const std::string& sql, const TQueryOptions& query_opts = 
TQueryOptions(),
+      const std::string& sql, const QueryOptionMap& query_opts = {},
       const bool persist_in_db = true, TUniqueId* query_id = nullptr);
   virtual Status ExecuteAndFetchAllText(const std::string& user_name,
       const std::string& sql, query_results& results, results_columns* columns 
= nullptr,
       TUniqueId* query_id = nullptr);
   virtual Status SubmitAndWait(const std::string& user_name, const 
std::string& sql,
       TUniqueId& new_session_id, TUniqueId& new_query_id,
-      const TQueryOptions& query_opts = TQueryOptions(),
-      const bool persist_in_db = true);
+      const QueryOptionMap& query_opts = {}, const bool persist_in_db = true);
   virtual Status WaitForResults(TUniqueId& query_id);
   virtual Status SubmitQuery(const std::string& sql, const impala::TUniqueId& 
session_id,
       TUniqueId& new_query_id, const bool persist_in_db = true);
diff --git a/be/src/service/internal-server-test.cc 
b/be/src/service/internal-server-test.cc
index c1cb1ab79..ecc184283 100644
--- a/be/src/service/internal-server-test.cc
+++ b/be/src/service/internal-server-test.cc
@@ -130,7 +130,7 @@ class DatabaseTest {
       TUniqueId query_id;
       EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", StrCat("create 
database ",
           database_name_, " comment 'Temporary database created and managed by 
"
-          "internal-server-test'"), TQueryOptions(), false, &query_id));
+          "internal-server-test'"), {}, false, &query_id));
       assertQueryState(query_id, QUERY_STATE_SUCCESS);
 
       if (create_table) {
@@ -138,7 +138,7 @@ class DatabaseTest {
         EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", 
StrCat("create table ",
             table_name_, "(id INT, name STRING, first_sold TIMESTAMP, "
             "last_sold TIMESTAMP, price DECIMAL(30, 2)) partitioned by 
(category INT)"),
-            TQueryOptions(), false, &query_id));
+            {}, false, &query_id));
         assertQueryState(query_id, QUERY_STATE_SUCCESS);
 
         // Insert some products that have a last_sold time.
@@ -162,7 +162,7 @@ class DatabaseTest {
           }
         }
 
-        EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql1, 
TQueryOptions(),
+        EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql1, {},
             false, &query_id));
         assertQueryState(query_id, QUERY_STATE_SUCCESS);
 
@@ -183,7 +183,7 @@ class DatabaseTest {
           }
         }
 
-        EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql2, 
TQueryOptions(),
+        EXPECT_OK(impala_server_->ExecuteIgnoreResults("impala", sql2, {},
             false, &query_id));
         assertQueryState(query_id, QUERY_STATE_SUCCESS);
       }
@@ -221,13 +221,11 @@ TEST(InternalServerTest, QueryTimeout) {
   DatabaseTest db_test = DatabaseTest(impala_server_, "query_timeout", true, 
5);
   InternalServer* fixture = impala_server_.get();
 
-  TQueryOptions query_opts;
-  query_opts.__set_fetch_rows_timeout_ms(1);
-
   TUniqueId session_id;
   TUniqueId query_id;
 
-  ASSERT_OK(fixture->OpenSession("impala", session_id, query_opts));
+  ASSERT_OK(fixture->OpenSession("impala", session_id,
+      {{TImpalaQueryOptions::FETCH_ROWS_TIMEOUT_MS, "1"}}));
 
   // Run a query that will execute for longer than the configured exec timeout.
   ASSERT_OK(fixture->SubmitQuery(StrCat("select * from ", 
db_test.GetTableName(),
@@ -249,12 +247,10 @@ TEST(InternalServerTest, QueryTimeout) {
 // Asserts the expected error is returned when a query option is set to an 
invalid value.
 TEST(InternalServerTest, InvalidQueryOption) {
   InternalServer* fixture = impala_server_.get();
-  TQueryOptions query_opts;
-
-  query_opts.__set_mem_limit_executors(-2);
 
   TUniqueId session_id;
-  Status stat = fixture->OpenSession("impala", session_id, query_opts);
+  Status stat = fixture->OpenSession("impala", session_id,
+      {{TImpalaQueryOptions::MEM_LIMIT_EXECUTORS, "-2"}});
 
   ASSERT_FALSE(stat.ok());
   ASSERT_EQ("Failed to parse query option 'MEM_LIMIT_EXECUTORS': -2", 
stat.msg().msg());
@@ -281,7 +277,7 @@ TEST(InternalServerTest, MultipleQueriesMultipleSessions) {
   // Insert a record into the test table using a new session.
   ASSERT_OK(fixture->ExecuteIgnoreResults("impala", StrCat("insert into ",
       test_table_name, "(id,first_name,last_name) VALUES 
(1,'test','person1')"),
-      TQueryOptions(), false, &query_id));
+      {}, false, &query_id));
   assertQueryState(query_id, QUERY_STATE_SUCCESS);
 
   // Select a record from the test table using a new session.
@@ -317,10 +313,8 @@ TEST(InternalServerTest, RetryFailedQuery) {
       StrCat("IMPALA_SERVICE_POOL:127.0.0.1:",FLAGS_krpc_port,
       ":ExecQueryFInstances:FAIL"));
 
-  TQueryOptions query_opts;
-  query_opts.__set_retry_failed_queries(true);
-
-  ASSERT_OK(fixture->OpenSession("impala", session_id, query_opts));
+  ASSERT_OK(fixture->OpenSession("impala", session_id,
+      {{TImpalaQueryOptions::RETRY_FAILED_QUERIES, "true"}}));
 
   // Run a query that will fail and get automatically retried.
   ASSERT_OK(fixture->SubmitQuery("select 1", session_id, query_id));
@@ -415,7 +409,7 @@ TEST(InternalServerTest, MissingClosingQuote) {
 
   const string expected_msg = "ParseException: Unmatched string literal";
   res = fixture->ExecuteIgnoreResults("impala",StrCat( "select * from ",
-      db_test.GetTableName(), " where name = 'foo"), TQueryOptions(), false, 
&query_id);
+      db_test.GetTableName(), " where name = 'foo"), {}, false, &query_id);
   EXPECT_EQ(TErrorCode::GENERAL, res.code());
   EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
   EXPECT_EQ(TUniqueId(), query_id);
@@ -429,7 +423,7 @@ TEST(InternalServerTest, SyntaxError) {
 
   const string expected_msg = "ParseException: Syntax error in line 1";
   res = fixture->ExecuteIgnoreResults("impala", StrCat("select * from ",
-      db_test.GetTableName(), "; select"), TQueryOptions(), false, &query_id);
+      db_test.GetTableName(), "; select"), {}, false, &query_id);
   EXPECT_EQ(TErrorCode::GENERAL, res.code());
   EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
   EXPECT_EQ(TUniqueId(), query_id);
@@ -441,7 +435,7 @@ TEST(InternalServerTest, UnclosedComment) {
   Status res;
 
   const string expected_msg = "ParseException: Syntax error in line 1";
-  res = fixture->ExecuteIgnoreResults("impala", "select 1 /*foo", 
TQueryOptions(), false,
+  res = fixture->ExecuteIgnoreResults("impala", "select 1 /*foo", {}, false,
       &query_id);
   EXPECT_EQ(TErrorCode::GENERAL, res.code());
   EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
@@ -459,7 +453,7 @@ TEST(InternalServerTest, TableNotExist) {
       ASSERT_OK(fixture->ExecuteIgnoreResults("impala", StrCat("drop table ",
       db_test.GetTableName(), " purge")));
   res = fixture->ExecuteIgnoreResults("impala", StrCat("select * from ",
-      db_test.GetTableName()), TQueryOptions(), false, &query_id);
+      db_test.GetTableName()), {}, false, &query_id);
   EXPECT_EQ(TErrorCode::GENERAL, res.code());
   EXPECT_EQ(expected_msg, res.msg().msg().substr(0, expected_msg.length()));
   EXPECT_EQ(TUniqueId(), query_id);
diff --git a/be/src/service/internal-server.cc 
b/be/src/service/internal-server.cc
index 829b79225..19c0ae5db 100644
--- a/be/src/service/internal-server.cc
+++ b/be/src/service/internal-server.cc
@@ -34,7 +34,7 @@ using namespace std;
 namespace impala {
 
 Status ImpalaServer::OpenSession(const string& user_name, TUniqueId& 
new_session_id,
-    const TQueryOptions& query_opts) {
+    const QueryOptionMap& query_opts) {
   shared_ptr<ThriftServer::ConnectionContext> conn_ctx =
       make_shared<ThriftServer::ConnectionContext>();
   conn_ctx->connection_id = RandomUniqueID();
@@ -58,14 +58,11 @@ Status ImpalaServer::OpenSession(const string& user_name, 
TUniqueId& new_session
   {
     lock_guard<mutex> l(session_state_map_lock_);
     session_state = session_state_map_[new_session_id];
-    std::map<string, string> query_opts_map;
-    TQueryOptionsToMap(query_opts, &query_opts_map);
-    for (auto iter=query_opts_map.cbegin(); iter!=query_opts_map.cend(); 
iter++) {
-      if (!iter->second.empty()) {
-        RETURN_IF_ERROR(SetQueryOption(iter->first, iter->second,
+  }
+
+  for (const auto& iter : query_opts) {
+    RETURN_IF_ERROR(SetQueryOption(iter.first, iter.second,
         &session_state->set_query_options, 
&session_state->set_query_options_mask));
-      }
-    }
   }
 
   MarkSessionActive(session_state);
@@ -102,7 +99,7 @@ bool ImpalaServer::CloseSession(const TUniqueId& session_id) 
{
 } // ImpalaServer::CloseSession
 
 Status ImpalaServer::ExecuteIgnoreResults(const string& user_name, const 
string& sql,
-    const TQueryOptions& query_opts, const bool persist_in_db, TUniqueId* 
query_id) {
+    const QueryOptionMap& query_opts, const bool persist_in_db, TUniqueId* 
query_id) {
   TUniqueId session_id;
   TUniqueId internal_query_id;
   Status result;
@@ -130,7 +127,7 @@ Status ImpalaServer::ExecuteAndFetchAllText(const 
std::string& user_name,
   TUniqueId internal_query_id;
   Status result;
 
-  result = SubmitAndWait(user_name, sql, session_id, internal_query_id, 
TQueryOptions());
+  result = SubmitAndWait(user_name, sql, session_id, internal_query_id);
 
   if (query_id != nullptr) {
     *query_id = internal_query_id;
@@ -150,7 +147,7 @@ Status ImpalaServer::ExecuteAndFetchAllText(const 
std::string& user_name,
 } // ImpalaServer::ExecuteAndFetchAllText
 
 Status ImpalaServer::SubmitAndWait(const string& user_name, const string& sql,
-    TUniqueId& new_session_id, TUniqueId& new_query_id, const TQueryOptions& 
query_opts,
+    TUniqueId& new_session_id, TUniqueId& new_query_id, const QueryOptionMap& 
query_opts,
     const bool persist_in_db) {
 
   RETURN_IF_ERROR(OpenSession(user_name, new_session_id, query_opts));
@@ -200,11 +197,8 @@ Status ImpalaServer::SubmitQuery(const string& sql, const 
TUniqueId& session_id,
 
   session_state->ToThrift(session_state->session_id, &query_context.session);
 
-  QueryOptionsMask set_query_options_mask;
   query_context.client_request.query_options = session_state->QueryOptions();
-  set_query_options_mask = session_state->set_query_options_mask;
-
-  AddPoolConfiguration(&query_context, ~set_query_options_mask);
+  AddPoolConfiguration(&query_context, ~session_state->set_query_options_mask);
 
   QueryHandle query_handle;
   RETURN_IF_ERROR(Execute(&query_context, session_state, &query_handle, 
nullptr,
diff --git a/be/src/service/internal-server.h b/be/src/service/internal-server.h
index aedaeb058..65e78bdee 100644
--- a/be/src/service/internal-server.h
+++ b/be/src/service/internal-server.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <map>
 #include <memory>
 #include <string>
 #include <vector>
@@ -59,6 +60,8 @@ namespace impala {
     public:
       virtual ~InternalServer() {}
 
+      using QueryOptionMap = std::map<TImpalaQueryOptions::type, string>;
+
       /// Creates and registers a new connection and session.
       ///
       /// Parameters:
@@ -72,7 +75,7 @@ namespace impala {
       /// Return:
       ///   `impala::Status` Indicates the result of opening the new session.
       virtual Status OpenSession(const std::string& user_name, TUniqueId& 
new_session_id,
-          const TQueryOptions& query_opts = TQueryOptions()) = 0;
+          const QueryOptionMap& query_opts = {}) = 0;
 
       /// Closes a given session cleaning up all associated resources.
       ///
@@ -105,7 +108,7 @@ namespace impala {
       ///   `impala::Status` Indicates the result of submitting the query and 
waiting for
       ///                    it to return.
       virtual Status ExecuteIgnoreResults(const std::string& user_name,
-          const std::string& sql, const TQueryOptions& query_opts = 
TQueryOptions(),
+          const std::string& sql, const QueryOptionMap& query_opts = {},
           const bool persist_in_db = true, TUniqueId* query_id = nullptr) = 0;
 
       /// Creates a new session under the specified user and submits a query 
under that
@@ -162,8 +165,7 @@ namespace impala {
       ///   `impala::Status` Indicates the result of submitting and waiting 
for the query.
       virtual Status SubmitAndWait(const std::string& user_name, const 
std::string& sql,
           TUniqueId& new_session_id, TUniqueId& new_query_id,
-          const TQueryOptions& query_opts = TQueryOptions(),
-          const bool persist_in_db = true) = 0;
+          const QueryOptionMap& query_opts = {}, const bool persist_in_db = 
true) = 0;
 
       /// Waits until the given query has results available.
       ///
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 10b4f3fbc..7f52e943c 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -231,9 +231,13 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
   if (option_int < 0) {
     return Status(Substitute("Invalid query option: $0", key));
   }
+  return SetQueryOption(static_cast<TImpalaQueryOptions::type>(option_int),
+      value, query_options, set_query_options_mask);
+}
 
+Status impala::SetQueryOption(TImpalaQueryOptions::type option, const string& 
value,
+    TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask) {
   QueryConstants qc;
-  TImpalaQueryOptions::type option = 
static_cast<TImpalaQueryOptions::type>(option_int);
 
   if (value.empty()) {
     ResetQueryOption(option, query_options);
@@ -1286,6 +1290,7 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         break;
       }
       default:
+        string key = to_string(option);
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << 
key << "'";
           return Status::OK();
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 127f1ddb2..e9e23a38a 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -371,6 +371,12 @@ void OverlayQueryOptions(const TQueryOptions& src, const 
QueryOptionsMask& mask,
 Status SetQueryOption(const std::string& key, const std::string& value,
     TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask);
 
+/// Set the key/value pair in TQueryOptions. It will override existing setting 
in
+/// query_options. The bit corresponding to query option 'key' in 
set_query_options_mask
+/// is set. An empty string value will reset the key to its default value.
+Status SetQueryOption(TImpalaQueryOptions::type option, const std::string& 
value,
+    TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask);
+
 /// Validates the query options after they have all been set. Returns a Status 
indicating
 /// the results of running the validation rules. The majority of the query 
options
 /// validation is done in SetQueryOption. However, more complex validations 
rules (e.g.
diff --git a/be/src/service/workload-management-flags.cc 
b/be/src/service/workload-management-flags.cc
index 81ec53bf1..b71bef2cc 100644
--- a/be/src/service/workload-management-flags.cc
+++ b/be/src/service/workload-management-flags.cc
@@ -79,7 +79,7 @@ DEFINE_int32_hidden(query_log_write_timeout_s, 0, "Specifies 
the query timeout i
     "seconds for inserts to the query log table. A value less than 1 indicates 
to use "
     "the same value as the query_log_write_interval_s flag.");
 
-DEFINE_int32(query_log_max_queued, 10000, "Maximum number of records that can 
be queued "
+DEFINE_int32(query_log_max_queued, 5000, "Maximum number of records that can 
be queued "
     "before they are written to the impala query log table. This flag operates 
"
     "independently of the 'query_log_write_interval_s' flag. If the number of 
queued "
     "records reaches this value, the records will be written to the query log 
table no "
diff --git a/be/src/service/workload-management.cc 
b/be/src/service/workload-management.cc
index ca570e787..cf88365d3 100644
--- a/be/src/service/workload-management.cc
+++ b/be/src/service/workload-management.cc
@@ -45,8 +45,11 @@
 #include "service/internal-server.h"
 #include "service/query-state-record.h"
 #include "util/debug-util.h"
+#include "util/histogram-metric.h"
 #include "util/impalad-metrics.h"
 #include "util/metrics.h"
+#include "util/pretty-printer.h"
+#include "util/stopwatch.h"
 #include "util/string-util.h"
 #include "util/thread.h"
 #include "util/ticker.h"
@@ -78,7 +81,7 @@ static const string DB = "sys";
 /// Default query options that will be provided on all queries that insert 
rows into the
 /// completed queries table. See the initialization code in the
 /// ImpalaServer::CompletedQueriesThread function for details on which options 
are set.
-static TQueryOptions insert_query_opts;
+static InternalServer::QueryOptionMap insert_query_opts;
 
 /// Non-values portion of the sql DML to insert records into the completed 
queries table.
 /// Generates the first portion of the DML that inserts records into the 
completed queries
@@ -98,11 +101,11 @@ static inline bool MaxRecordsExceeded(size_t record_count) 
noexcept {
 
 /// Sets up the sys database generating and executing the necessary DML 
statements.
 static const Status SetupDb(InternalServer* server) {
-  insert_query_opts.__set_sync_ddl(true);
+  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
   RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
       StrCat("CREATE DATABASE IF NOT EXISTS ", DB, " COMMENT "
       "'System database for Impala introspection'"), insert_query_opts, 
false));
-  insert_query_opts.__set_sync_ddl(false);
+  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
   return Status::OK();
 } // function SetupDb
 
@@ -116,7 +119,7 @@ static string GetColumnName(const FieldDefinition& field) {
 /// Sets up the query table by generating and executing the necessary DML 
statements.
 static const Status SetupTable(InternalServer* server, const string& 
table_name,
     bool is_system_table = false) {
-  insert_query_opts.__set_sync_ddl(true);
+  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "true";
 
   StringStreamPop create_table_sql;
   create_table_sql << "CREATE TABLE IF NOT EXISTS " << table_name << "(";
@@ -157,7 +160,7 @@ static const Status SetupTable(InternalServer* server, 
const string& table_name,
   RETURN_IF_ERROR(server->ExecuteIgnoreResults(FLAGS_workload_mgmt_user,
       create_table_sql.str(), insert_query_opts, false));
 
-  insert_query_opts.__set_sync_ddl(false);
+  insert_query_opts[TImpalaQueryOptions::SYNC_DDL] = "false";
 
   LOG(INFO) << "Completed " << table_name << " initialization. 
write_interval=\"" <<
       FLAGS_query_log_write_interval_s << "s\"";
@@ -320,11 +323,12 @@ void ImpalaServer::CompletedQueriesThread() {
   }
 
   // Setup default query options.
-  insert_query_opts.__set_timezone("UTC");
-  insert_query_opts.__set_query_timeout_s((FLAGS_query_log_write_timeout_s < 1 
?
-      FLAGS_query_log_write_interval_s : FLAGS_query_log_write_timeout_s));
+  insert_query_opts[TImpalaQueryOptions::TIMEZONE] = "UTC";
+  insert_query_opts[TImpalaQueryOptions::QUERY_TIMEOUT_S] = std::to_string(
+      FLAGS_query_log_write_timeout_s < 1 ?
+      FLAGS_query_log_write_interval_s : FLAGS_query_log_write_timeout_s);
   if (!FLAGS_query_log_request_pool.empty()) {
-    insert_query_opts.__set_request_pool(FLAGS_query_log_request_pool);
+    insert_query_opts[TImpalaQueryOptions::REQUEST_POOL] = 
FLAGS_query_log_request_pool;
   }
 
   // Fully qualified table name based on startup flags.
@@ -388,9 +392,6 @@ void ImpalaServer::CompletedQueriesThread() {
         });
     completed_queries_ticker_->ResetWakeupGuard();
 
-    // transfer all currently queued completed queries to another list for 
processing
-    // so that the completed queries queue is not blocked while creating and 
executing the
-    // DML to insert into the query log table
     if (!completed_queries_.empty()) {
       if (MaxRecordsExceeded(completed_queries_.size())) {
         ImpaladMetrics::COMPLETED_QUERIES_MAX_RECORDS_WRITES->Increment(1L);
@@ -398,6 +399,9 @@ void ImpalaServer::CompletedQueriesThread() {
         ImpaladMetrics::COMPLETED_QUERIES_SCHEDULED_WRITES->Increment(1L);
       }
 
+      MonotonicStopWatch timer;
+      timer.Start();
+
       // Copy all completed queries to a temporary list so that inserts to the
       // completed_queries list are not blocked while generating and running 
an insert
       // SQL statement for the completed queries.
@@ -440,10 +444,11 @@ void ImpalaServer::CompletedQueriesThread() {
         sql.pop_back();
         const size_t final_sql_len = _insert_dml.size() + sql.size();
 
+        uint64_t gather_time = timer.Reset();
         TUniqueId tmp_query_id;
 
         // Build query options to ensure the query is not rejected.
-        TQueryOptions opts = insert_query_opts;
+        InternalServer::QueryOptionMap opts = insert_query_opts;
 
         if (UNLIKELY(final_sql_len > numeric_limits<int32_t>::max())) {
           LOG(ERROR) << "Completed queries table insert sql statement of 
length '" <<
@@ -452,26 +457,40 @@ void ImpalaServer::CompletedQueriesThread() {
           continue; // NOTE: early loop continuation
         }
 
-        opts.__set_max_statement_length_bytes(final_sql_len < 1024 ? 1024 :
-            final_sql_len);
-        opts.__set_max_row_size(max_row_size);
+        // Set max_statement_length_bytes based on actual query, and at least 
the minimum.
+        opts[TImpalaQueryOptions::MAX_STATEMENT_LENGTH_BYTES] = std::to_string(
+            max<size_t>(MIN_MAX_STATEMENT_LENGTH_BYTES, final_sql_len));
+        // Set statement_expression_limit based on actual query, and at least 
the minimum.
+        opts[TImpalaQueryOptions::STATEMENT_EXPRESSION_LIMIT] = std::to_string(
+            max<size_t>(MIN_STATEMENT_EXPRESSION_LIMIT,
+                queries_to_insert.size() * 
_TQueryTableColumn_VALUES_TO_NAMES.size()));
+        opts[TImpalaQueryOptions::MAX_ROW_SIZE] = std::to_string(max_row_size);
 
         // Execute the insert dml.
         const Status ret_status = internal_server_->ExecuteIgnoreResults(
             FLAGS_workload_mgmt_user, StrCat(_insert_dml, sql), opts, false,
             &tmp_query_id);
 
+        uint64_t exec_time = timer.ElapsedTime();
+        ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS->Update(
+            gather_time + exec_time);
         if (ret_status.ok()) {
           LOG(INFO) << "wrote completed queries table=\"" << log_table_name << 
"\" "
-              "record_count=\"" << queries_to_insert.size() << "\"";
+              "record_count=" << queries_to_insert.size() << " "
+              "bytes=" << PrettyPrinter::PrintBytes(sql.size()) << " "
+              "gather_time=" << PrettyPrinter::Print(gather_time, 
TUnit::TIME_NS) << " "
+              "exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS);
           ImpaladMetrics::COMPLETED_QUERIES_QUEUED->Increment(
               queries_to_insert.size() * -1);
           DCHECK(ImpaladMetrics::COMPLETED_QUERIES_QUEUED->GetValue() >= 0);
           ImpaladMetrics::COMPLETED_QUERIES_WRITTEN->Increment(
               queries_to_insert.size());
         } else {
-          LOG(WARNING) << "failed to write completed queries table=\"" <<
-              log_table_name << "\" record_count=\"" << 
queries_to_insert.size() << "\"";
+          LOG(WARNING) << "failed to write completed queries table=\"" << 
log_table_name
+              << "\" record_count=" << queries_to_insert.size() << " "
+              "bytes=" << PrettyPrinter::PrintBytes(sql.size()) << " "
+              "gather_time=" << PrettyPrinter::Print(gather_time, 
TUnit::TIME_NS) << " "
+              "exec_time=" << PrettyPrinter::Print(exec_time, TUnit::TIME_NS);
           LOG(WARNING) << ret_status.GetDetail();
           
ImpaladMetrics::COMPLETED_QUERIES_FAIL->Increment(queries_to_insert.size());
           completed_queries_lock_.lock();
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 2079755a4..2e85d76b2 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -176,6 +176,8 @@ const char* 
ImpaladMetricKeys::COMPLETED_QUERIES_SCHEDULED_WRITES =
     "impala-server.completed-queries.scheduled-writes";
 const char* ImpaladMetricKeys::COMPLETED_QUERIES_MAX_RECORDS_WRITES =
     "impala-server.completed-queries.max-records-writes";
+const char* ImpaladMetricKeys::COMPLETED_QUERIES_WRITE_DURATIONS =
+    "impala-server.completed-queries.write-durations";
 const char* ImpaladMetricKeys::DEBUG_ACTION_NUM_FAIL = 
"impala.debug_action.fail";
 const char* ImpaladMetricKeys::QUERY_LOG_EST_TOTAL_BYTES =
     "impala-server.query-log-est-total-bytes";
@@ -272,6 +274,7 @@ StringProperty* ImpaladMetrics::ACTIVE_CATALOGD_ADDRESS = 
nullptr;
 // Histograms
 HistogramMetric* ImpaladMetrics::QUERY_DURATIONS = nullptr;
 HistogramMetric* ImpaladMetrics::DDL_DURATIONS = nullptr;
+HistogramMetric* ImpaladMetrics::COMPLETED_QUERIES_WRITE_DURATIONS = nullptr;
 
 // Other
 StatsMetric<uint64_t, StatsType::MEAN>*
@@ -459,6 +462,9 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       MetricDefs::Get(ImpaladMetricKeys::QUERY_DURATIONS), FIVE_HOURS_IN_MS, 
3));
   DDL_DURATIONS = m->RegisterMetric(new HistogramMetric(
       MetricDefs::Get(ImpaladMetricKeys::DDL_DURATIONS), FIVE_HOURS_IN_MS, 3));
+  COMPLETED_QUERIES_WRITE_DURATIONS = m->RegisterMetric(new HistogramMetric(
+      MetricDefs::Get(ImpaladMetricKeys::COMPLETED_QUERIES_WRITE_DURATIONS),
+      FIVE_HOURS_IN_MS, 3));
 
   // Initialize Hedged read metrics
   HEDGED_READ_OPS = m->AddCounter(ImpaladMetricKeys::HEDGED_READ_OPS, 0);
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index c30ff0e53..2dcf2c568 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -284,6 +284,9 @@ class ImpaladMetricKeys {
   /// Number of writes to the query log table that happened because the max 
queued
   /// completed queries records was reached.
   static const char* COMPLETED_QUERIES_MAX_RECORDS_WRITES;
+
+  /// Time spent writing completed queries to the query log table.
+  static const char* COMPLETED_QUERIES_WRITE_DURATIONS;
 };
 
 /// Global impalad-wide metrics.  This is useful for objects that want to 
update metrics
@@ -383,6 +386,7 @@ class ImpaladMetrics {
   // Histograms
   static HistogramMetric* QUERY_DURATIONS;
   static HistogramMetric* DDL_DURATIONS;
+  static HistogramMetric* COMPLETED_QUERIES_WRITE_DURATIONS;
 
   // Other
   static StatsMetric<uint64_t, StatsType::MEAN>* 
IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO;
diff --git a/common/thrift/SystemTables.thrift 
b/common/thrift/SystemTables.thrift
index a67282609..a66e27cc5 100644
--- a/common/thrift/SystemTables.thrift
+++ b/common/thrift/SystemTables.thrift
@@ -20,6 +20,8 @@ namespace java org.apache.impala.thrift
 
 # Must be kept in-sync with workload-management-fields.cc
 # Used as column names, so do not change existing enums.
+# When adding new columns, review the default for query_log_max_queued to 
maintain
+#   query_log_max_queued * len(TQueryTableColumn) < 
statement_expression_limit(250k)
 enum TQueryTableColumn {
     CLUSTER_ID
     QUERY_ID
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 5aee94d49..d77415dfc 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -3899,9 +3899,19 @@
     "contexts": [
       "IMPALAD"
     ],
-    "label": "Max Records Hit",
+    "label": "Completed Queries Max Records Hit",
     "units": "NONE",
     "kind": "COUNTER",
     "key": "impala-server.completed-queries.max-records-writes"
+  },
+  {
+    "description": "Time spent writing completed queries to the query log 
table.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Completed Queries Write Duration Distribution",
+    "units": "TIME_NS",
+    "kind": "HISTOGRAM",
+    "key": "impala-server.completed-queries.write-durations"
   }
 ]
diff --git a/tests/custom_cluster/test_query_log.py 
b/tests/custom_cluster/test_query_log.py
index 9c393f60e..a81f92ca5 100644
--- a/tests/custom_cluster/test_query_log.py
+++ b/tests/custom_cluster/test_query_log.py
@@ -18,7 +18,6 @@
 from __future__ import absolute_import, division, print_function
 
 import os
-import pytest
 import string
 import tempfile
 
@@ -94,7 +93,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
   MAX_SQL_PLAN_LEN = 2000
   LOG_DIR_MAX_WRITES = tempfile.mkdtemp(prefix="max_writes")
   FLUSH_MAX_RECORDS_CLUSTER_ID = "test_query_log_max_records_" + 
str(int(time()))
-  FLUSH_MAX_RECORDS_QUERY_COUNT = 2
+  FLUSH_MAX_RECORDS_QUERY_COUNT = 30
   OTHER_TBL = "completed_queries_table_{0}".format(int(time()))
 
   @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt "
@@ -310,10 +309,13 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
                                                  
.format(FLUSH_MAX_RECORDS_QUERY_COUNT,
                                                  FLUSH_MAX_RECORDS_CLUSTER_ID),
                                     catalogd_args="--enable_workload_mgmt",
+                                    default_query_options=[
+                                      ('statement_expression_limit', 1024)],
                                     impalad_graceful_shutdown=True)
   def test_query_log_flush_max_records(self, vector):
     """Asserts that queries that have completed are written to the query log 
table when
-       the maximum number of queued records it reached."""
+       the maximum number of queued records it reached. Also verifies that 
writing
+       completed queries is not limited by default 
statement_expression_limit."""
 
     impalad = self.cluster.get_first_impalad()
     client = self.get_client(vector.get_value('protocol'))
@@ -341,7 +343,8 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
     impalad.service.wait_for_metric_value(
         "impala-server.completed-queries.max-records-writes", 1, 60)
     self.cluster.get_first_impalad().service.wait_for_metric_value(
-        "impala-server.completed-queries.written", 3, 60)
+        "impala-server.completed-queries.written",
+        self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1, 60)
 
     # Force Impala to process the inserts to the completed queries table.
     client.execute("refresh " + self.QUERY_TBL)
@@ -352,7 +355,7 @@ class TestQueryLogTableBeeswax(TestQueryLogTableBase):
         .format(self.QUERY_TBL, rand_str))
     assert res.success
     assert 1 == len(res.data)
-    assert "3" == res.data[0]
+    assert str(self.FLUSH_MAX_RECORDS_QUERY_COUNT + 1) == res.data[0]
     impalad.service.wait_for_metric_value(
         "impala-server.completed-queries.queued", 2, 60)
 

Reply via email to