This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 35c6a0b76 IMPALA-13726 Add admission control slots to /queries page in 
webui
35c6a0b76 is described below

commit 35c6a0b76d6c41c6b120d0c1eae7023bee518502
Author: Andrew Sherman <[email protected]>
AuthorDate: Wed Jan 22 15:04:44 2025 -0800

    IMPALA-13726 Add admission control slots to /queries page in webui
    
    When tracking resource utilization it is useful to see how many
    admission control slots are being used by queries. Add the slots used
    by coordinator and executors to the webui queries tables. For
    implementation reasons this entails also adding these fields to the
    query history and live query tables.
    
    The executor admission control slots are calculated by looking at a
    single executor backend. In theory this single number could be
    misleading but in practice queries are expected to have symmetrical
    slots across executors.
    
    Bump the schema number for the query history schema, and add some new
    tests.
    
    Change-Id: I057493b7767902a417dfeb75cdaeffd452d66789
    Reviewed-on: http://gerrit.cloudera.org:8080/22443
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/system-table-scanner.cc                | 10 +++
 be/src/service/client-request-state.h              |  1 -
 be/src/service/impala-http-handler.cc              | 12 +++
 be/src/service/query-state-record-test.cc          | 26 +++++-
 be/src/service/query-state-record.cc               | 19 ++++-
 be/src/service/query-state-record.h                | 12 +++
 be/src/service/workload-management-worker.cc       | 10 +++
 be/src/service/workload-management-worker.h        |  2 +-
 .../workload-management-fields-test.cc             |  9 ++
 be/src/workload_mgmt/workload-management-test.cc   |  9 +-
 be/src/workload_mgmt/workload-management.cc        |  8 +-
 be/src/workload_mgmt/workload-management.h         |  4 +-
 common/thrift/SystemTables.thrift                  |  4 +
 .../workload-mgmt-impala_query_live-v1.2.0.test    | 66 +++++++++++++++
 .../workload-mgmt-impala_query_log-v1.2.0.test     | 66 +++++++++++++++
 tests/custom_cluster/test_query_live.py            | 10 +--
 tests/custom_cluster/test_workload_mgmt_init.py    | 97 ++++++++++++++++++----
 tests/util/workload_management.py                  | 20 ++++-
 www/queries.tmpl                                   | 16 ++++
 www/query_detail_tabs.tmpl                         |  4 +
 20 files changed, 371 insertions(+), 34 deletions(-)

diff --git a/be/src/exec/system-table-scanner.cc 
b/be/src/exec/system-table-scanner.cc
index 629336a9d..141aa77c6 100644
--- a/be/src/exec/system-table-scanner.cc
+++ b/be/src/exec/system-table-scanner.cc
@@ -405,6 +405,16 @@ Status QueryScanner::MaterializeNextTuple(
           RETURN_IF_ERROR(WriteStringSlot(join(query.orderby_columns, ","), 
pool, slot));
         }
         break;
+      case TQueryTableColumn::COORDINATOR_SLOTS:
+        if (LIKELY(IncludeField(TQueryTableColumn::type::COORDINATOR_SLOTS))) {
+          WriteBigIntSlot(record.coordinator_slots, slot);
+        }
+        break;
+      case TQueryTableColumn::EXECUTOR_SLOTS:
+        if (LIKELY(IncludeField(TQueryTableColumn::type::EXECUTOR_SLOTS))) {
+          WriteBigIntSlot(record.executor_slots, slot);
+        }
+        break;
       default:
         LOG(WARNING) << "Unknown column (position " << slot_desc->col_pos() << 
") added"
             " to table " << table_name_ << "; check if a coordinator was 
upgraded";
diff --git a/be/src/service/client-request-state.h 
b/be/src/service/client-request-state.h
index dd6e6cffb..8a8543295 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -38,7 +38,6 @@
 namespace impala {
 
 class AdmissionControlClient;
-class ClientRequestStateCleaner;
 class Coordinator;
 class Expr;
 class Frontend;
diff --git a/be/src/service/impala-http-handler.cc 
b/be/src/service/impala-http-handler.cc
index 26ef6390d..d7b7bcd85 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -505,6 +505,14 @@ void ImpalaHttpHandler::AddQueryRecordTips(Document* 
document) {
 
   document->AddMember("tips_statement", "The statement submitted for the 
query.",
       document->GetAllocator());
+
+  document->AddMember("tips_coordinator_slots",
+      "The number of admission control slots used on the coordinator.",
+      document->GetAllocator());
+
+  document->AddMember("tips_executor_slots",
+      "The number of admission control slots used on the executors.",
+      document->GetAllocator());
 }
 
 std::string ImpalaHttpHandler::ProgressToString(int64_t num_completed, int64_t 
total) {
@@ -645,6 +653,10 @@ void ImpalaHttpHandler::QueryStateToJson(const 
QueryStateRecord& record,
 
   Value resource_pool(record.resource_pool.c_str(), document->GetAllocator());
   value->AddMember("resource_pool", resource_pool, document->GetAllocator());
+
+  value->AddMember(
+      "coordinator_slots", record.coordinator_slots, document->GetAllocator());
+  value->AddMember("executor_slots", record.executor_slots, 
document->GetAllocator());
 }
 
 void ImpalaHttpHandler::QueryStateHandler(const Webserver::WebRequest& req,
diff --git a/be/src/service/query-state-record-test.cc 
b/be/src/service/query-state-record-test.cc
index 49bca460f..318c6608b 100644
--- a/be/src/service/query-state-record-test.cc
+++ b/be/src/service/query-state-record-test.cc
@@ -153,4 +153,28 @@ TEST(QueryStateRecordTest, 
PerHostStatePeakMemoryComparatorEqual) {
   EXPECT_FALSE(PerHostPeakMemoryComparator(pair_b, pair_a));
 }
 
-} //namespace impala
+/// Build a BackendExecParam for testing.
+void add_param(QuerySchedulePB& query_schedule, int slots_to_use, bool 
is_coordinator) {
+  BackendExecParamsPB* params = query_schedule.add_backend_exec_params();
+  params->set_slots_to_use(slots_to_use);
+  params->set_is_coord_backend(is_coordinator);
+}
+
+TEST(QueryStateRecordTest, AdmissionSlots) {
+  // Build a QuerySchedule with 3 backends:
+  // - one coordinator with 4 admission slots
+  // - two executors, the first with 7 admission slots, the second with 8.
+  //   This two values will not normally differ, this is done to show that the 
first
+  //   value is used.
+  QuerySchedulePB query_schedule;
+  add_param(query_schedule, 4, true);
+  add_param(query_schedule, 7, false);
+  add_param(query_schedule, 8, false);
+  int coordinator_slots = 
QueryStateRecord::get_admission_slots(&query_schedule, true);
+  int executor_slots = QueryStateRecord::get_admission_slots(&query_schedule, 
false);
+  // Verify that get_admission_slots() returns the right values.
+  EXPECT_EQ(coordinator_slots, 4);
+  EXPECT_EQ(executor_slots, 7);
+}
+
+} // namespace impala
diff --git a/be/src/service/query-state-record.cc 
b/be/src/service/query-state-record.cc
index fbe1f774c..09d9b5720 100644
--- a/be/src/service/query-state-record.cc
+++ b/be/src/service/query-state-record.cc
@@ -83,6 +83,8 @@ void QueryStateRecord::Init(const ClientRequestState& 
query_handle) {
     cluster_mem_est = query_handle.schedule()->cluster_mem_est();
     bytes_read = utilization.bytes_read;
     bytes_sent = utilization.exchange_bytes_sent + utilization.scan_bytes_sent;
+    coordinator_slots = get_admission_slots(query_handle.schedule(), true);
+    executor_slots = get_admission_slots(query_handle.schedule(), false);
     has_coord = true;
   } else {
     num_completed_scan_ranges = 0;
@@ -93,6 +95,8 @@ void QueryStateRecord::Init(const ClientRequestState& 
query_handle) {
     cluster_mem_est = 0;
     bytes_read = 0;
     bytes_sent = 0;
+    coordinator_slots = 0;
+    executor_slots = 0;
     has_coord = false;
   }
   beeswax_query_state = query_handle.BeeswaxQueryState();
@@ -135,7 +139,20 @@ void QueryStateRecord::Init(const ClientRequestState& 
query_handle) {
   }
 }
 
-bool QueryStateRecord::StartTimeComparator::operator() (
+int64_t QueryStateRecord::get_admission_slots(
+    const QuerySchedulePB* query_schedule, bool is_coordinator) {
+  int64_t number_slots = 0;
+  for (const auto& entry : query_schedule->backend_exec_params()) {
+    if (entry.is_coord_backend() == is_coordinator) {
+      number_slots = entry.slots_to_use();
+      // Stop looking as soon as we find one suitable backend.
+      break;
+    }
+  }
+  return number_slots;
+}
+
+bool QueryStateRecord::StartTimeComparator::operator()(
     const QueryStateRecord& lhs, const QueryStateRecord& rhs) const {
   if (lhs.start_time_us == rhs.start_time_us) return lhs.id < rhs.id;
   return lhs.start_time_us < rhs.start_time_us;
diff --git a/be/src/service/query-state-record.h 
b/be/src/service/query-state-record.h
index 27ce4ac37..0dd02ca5b 100644
--- a/be/src/service/query-state-record.h
+++ b/be/src/service/query-state-record.h
@@ -24,6 +24,7 @@
 
 #include "gen-cpp/ExecStats_types.h"
 #include "gen-cpp/Types_types.h"
+#include "gen-cpp/admission_control_service.pb.h"
 #include "util/network-util.h"
 
 namespace impala {
@@ -146,6 +147,11 @@ struct QueryStateRecord {
   /// True if this query was retried, false otherwise.
   bool was_retried = false;
 
+  /// Number of Admission Control Slots used on Coordinator.
+  int64_t coordinator_slots;
+  /// Number of Admission Control Slots used on Executor.
+  int64_t executor_slots;
+
   /// If this query was retried, the query id of the retried query.
   std::unique_ptr<const TUniqueId> retried_query_id;
 
@@ -165,6 +171,12 @@ struct QueryStateRecord {
     bool operator() (const QueryStateRecord& lhs, const QueryStateRecord& rhs) 
const;
   };
 
+  /// Get the number of admission slots used from the Query Schedule.
+  /// If 'is_coordinator' is true then only look at the coordinator backend, 
otherwise
+  /// use the value from the first executor backend.
+  static int64_t get_admission_slots(
+      const QuerySchedulePB* query_schedule, bool is_coordinator);
+
   private:
   // Common initialization for constructors.
   void Init(const ClientRequestState& exec_state);
diff --git a/be/src/service/workload-management-worker.cc 
b/be/src/service/workload-management-worker.cc
index 91db5accb..8d9dbdc18 100644
--- a/be/src/service/workload-management-worker.cc
+++ b/be/src/service/workload-management-worker.cc
@@ -410,6 +410,16 @@ const std::array<FieldParser, NumQueryTableColumns> 
FIELD_PARSERS = {{
     // orderby columns
     {[](FieldParserContext& ctx) {
       ctx.sql << "'" << boost::algorithm::join(ctx.record->orderby_columns, 
",") << "'";
+    }},
+
+    //  coordinator slots
+    {[](FieldParserContext& ctx) {
+      ctx.sql << ctx.record->base_state->coordinator_slots;
+    }},
+
+    // executor slots
+    {[](FieldParserContext& ctx) {
+      ctx.sql << ctx.record->base_state->executor_slots;
     }}}}; // FIELD_PARSERS constant array
 
 } // namespace workloadmgmt
diff --git a/be/src/service/workload-management-worker.h 
b/be/src/service/workload-management-worker.h
index 91086d35c..a09e00f5d 100644
--- a/be/src/service/workload-management-worker.h
+++ b/be/src/service/workload-management-worker.h
@@ -53,7 +53,7 @@ struct FieldParserContext {
 using FieldParser = void (*)(FieldParserContext&);
 
 /// Number of query table columns. Used to initialize a std::array.
-constexpr size_t NumQueryTableColumns = TQueryTableColumn::ORDERBY_COLUMNS + 1;
+constexpr size_t NumQueryTableColumns = TQueryTableColumn::EXECUTOR_SLOTS + 1;
 
 /// Array containing parser functions for each query column. These parsers are 
used to
 /// generate the value for each query column from a completed query 
represented by a
diff --git a/be/src/workload_mgmt/workload-management-fields-test.cc 
b/be/src/workload_mgmt/workload-management-fields-test.cc
index 96fe6492a..41febcf68 100644
--- a/be/src/workload_mgmt/workload-management-fields-test.cc
+++ b/be/src/workload_mgmt/workload-management-fields-test.cc
@@ -89,6 +89,13 @@ FieldDefEntry _createV110String(TQueryTableColumn::type 
db_col) {
   return make_pair<>(db_col, FieldDefinition(TPrimitiveType::STRING, v));
 }
 
+FieldDefEntry _createV120BigInt(TQueryTableColumn::type db_col) {
+  Version v;
+  EXPECT_TRUE(ParseVersion("1.2.0", &v).ok());
+
+  return make_pair<>(db_col, FieldDefinition(TPrimitiveType::BIGINT, v));
+}
+
 // Predicate function that can be passed to gtest EXPECT/ASSERT calls to 
determine
 // correctness of a FIELD_DEFINITIONS map entry.
 bool _fieldDefsEqual(const FieldDefEntry& lhs,
@@ -156,6 +163,8 @@ TEST(WorkloadManagementFieldsTest, CheckFieldDefinitions) {
       _createV110String(TQueryTableColumn::JOIN_COLUMNS),
       _createV110String(TQueryTableColumn::AGGREGATE_COLUMNS),
       _createV110String(TQueryTableColumn::ORDERBY_COLUMNS),
+      _createV120BigInt(TQueryTableColumn::COORDINATOR_SLOTS),
+      _createV120BigInt(TQueryTableColumn::EXECUTOR_SLOTS),
   };
 
   ASSERT_EQ(FIELD_DEFINITIONS.size(), expected_field_defs.size());
diff --git a/be/src/workload_mgmt/workload-management-test.cc 
b/be/src/workload_mgmt/workload-management-test.cc
index d2de2b70c..d7cc090f6 100644
--- a/be/src/workload_mgmt/workload-management-test.cc
+++ b/be/src/workload_mgmt/workload-management-test.cc
@@ -54,7 +54,7 @@ TEST(WorkloadManagementTest, ParseSchemaVersionFlagEmpty) {
   Version v;
 
   ASSERT_OK(ParseSchemaVersionFlag(&v));
-  ASSERT_EQ("1.1.0", v.ToString());
+  ASSERT_EQ("1.2.0", v.ToString());
 }
 
 TEST(WorkloadManagementTest, ParseSchemaVersionFlagInvalid) {
@@ -100,7 +100,7 @@ TEST(WorkloadManagementTest, StartupChecksUnknownVersion) {
 
   EXPECT_EQ(TErrorCode::type::GENERAL, ret.code());
   EXPECT_EQ("Workload management schema version '99999.99999.99999' is not one 
of the "
-            "known versions: '1.0.0', '1.1.0'",
+            "known versions: '1.0.0', '1.1.0', '1.2.0'",
       ret.msg().msg());
 }
 
@@ -133,7 +133,7 @@ TEST(WorkloadManagementTest, QueryLiveTableNameWithDb) {
 TEST(WorkloadManagementTest, KnownVersions) {
   // Asserts the known versions are correct and are in the correct order in the
   // KNOWN_VERSIONS set.
-  ASSERT_EQ(2, KNOWN_VERSIONS.size());
+  ASSERT_EQ(3, KNOWN_VERSIONS.size());
 
   auto iter = KNOWN_VERSIONS.cbegin();
 
@@ -143,5 +143,8 @@ TEST(WorkloadManagementTest, KnownVersions) {
   EXPECT_EQ("1.1.0", iter->ToString());
   iter++;
 
+  EXPECT_EQ("1.2.0", iter->ToString());
+  iter++;
+
   EXPECT_EQ(KNOWN_VERSIONS.cend(), iter);
 }
diff --git a/be/src/workload_mgmt/workload-management.cc 
b/be/src/workload_mgmt/workload-management.cc
index fffdeea68..98216a23b 100644
--- a/be/src/workload_mgmt/workload-management.cc
+++ b/be/src/workload_mgmt/workload-management.cc
@@ -173,8 +173,12 @@ const std::map<TQueryTableColumn::type, FieldDefinition> 
FIELD_DEFINITIONS = {
     {TQueryTableColumn::AGGREGATE_COLUMNS,
         FieldDefinition(TPrimitiveType::STRING, VERSION_1_1_0)},
     {TQueryTableColumn::ORDERBY_COLUMNS,
-        FieldDefinition(
-            TPrimitiveType::STRING, VERSION_1_1_0)}}; // FIELD_DEFINITIONS 
constant list
+        FieldDefinition(TPrimitiveType::STRING, VERSION_1_1_0)},
+    {TQueryTableColumn::COORDINATOR_SLOTS,
+        FieldDefinition(TPrimitiveType::BIGINT, VERSION_1_2_0)},
+    {TQueryTableColumn::EXECUTOR_SLOTS,
+        FieldDefinition(TPrimitiveType::BIGINT, VERSION_1_2_0)}};
+// FIELD_DEFINITIONS constant list
 
 /// Variable to cache the Version object created by parsing the workload 
management schema
 /// version startup flag. Variable must only be modified during the workload 
management
diff --git a/be/src/workload_mgmt/workload-management.h 
b/be/src/workload_mgmt/workload-management.h
index 9e4f10e4b..ccdb185cc 100644
--- a/be/src/workload_mgmt/workload-management.h
+++ b/be/src/workload_mgmt/workload-management.h
@@ -58,10 +58,12 @@ const std::string WM_DB = "sys";
 const kudu::Version NO_TABLE_EXISTS = kudu::Version();
 const kudu::Version VERSION_1_0_0 = impala::ConstructVersion(1, 0, 0);
 const kudu::Version VERSION_1_1_0 = impala::ConstructVersion(1, 1, 0);
+const kudu::Version VERSION_1_2_0 = impala::ConstructVersion(1, 2, 0);
 
 /// Set of all possible valid schema versions.  Must be sorted in order from 
earliest to
 /// latest version.
-const std::set<kudu::Version> KNOWN_VERSIONS = {VERSION_1_0_0, VERSION_1_1_0};
+const std::set<kudu::Version> KNOWN_VERSIONS = {
+    VERSION_1_0_0, VERSION_1_1_0, VERSION_1_2_0};
 
 /// Constants declaring how durations measured in milliseconds will be stored 
in the
 /// table. Must match the constants with the same name declared in 
SystemTable.java.
diff --git a/common/thrift/SystemTables.thrift 
b/common/thrift/SystemTables.thrift
index 25b30d893..e1cd1ca06 100644
--- a/common/thrift/SystemTables.thrift
+++ b/common/thrift/SystemTables.thrift
@@ -80,4 +80,8 @@ enum TQueryTableColumn {
     JOIN_COLUMNS
     AGGREGATE_COLUMNS
     ORDERBY_COLUMNS
+
+    # Change 2 - Added Columns for Workload Management Schema Version 1.2.0
+    COORDINATOR_SLOTS
+    EXECUTOR_SLOTS
 }
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_live-v1.2.0.test
 
b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_live-v1.2.0.test
new file mode 100644
index 000000000..d6d5122ce
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_live-v1.2.0.test
@@ -0,0 +1,66 @@
+====
+---- QUERY
+describe formatted sys.impala_query_live
+---- RESULTS
+'cluster_id','string',''
+'query_id','string',''
+'session_id','string',''
+'session_type','string',''
+'hiveserver2_protocol_version','string',''
+'db_user','string',''
+'db_user_connection','string',''
+'db_name','string',''
+'impala_coordinator','string',''
+'query_status','string',''
+'query_state','string',''
+'impala_query_end_state','string',''
+'query_type','string',''
+'network_address','string',''
+'start_time_utc','timestamp',''
+'total_time_ms','decimal(18,3)',''
+'query_opts_config','string',''
+'resource_pool','string',''
+'per_host_mem_estimate','bigint',''
+'dedicated_coord_mem_estimate','bigint',''
+'per_host_fragment_instances','string',''
+'backends_count','int',''
+'admission_result','string',''
+'cluster_memory_admitted','bigint',''
+'executor_group','string',''
+'executor_groups','string',''
+'exec_summary','string',''
+'num_rows_fetched','bigint',''
+'row_materialization_rows_per_sec','bigint',''
+'row_materialization_time_ms','decimal(18,3)',''
+'compressed_bytes_spilled','bigint',''
+'event_planning_finished','decimal(18,3)',''
+'event_submit_for_admission','decimal(18,3)',''
+'event_completed_admission','decimal(18,3)',''
+'event_all_backends_started','decimal(18,3)',''
+'event_rows_available','decimal(18,3)',''
+'event_first_row_fetched','decimal(18,3)',''
+'event_last_row_fetched','decimal(18,3)',''
+'event_unregister_query','decimal(18,3)',''
+'read_io_wait_total_ms','decimal(18,3)',''
+'read_io_wait_mean_ms','decimal(18,3)',''
+'bytes_read_cache_total','bigint',''
+'bytes_read_total','bigint',''
+'pernode_peak_mem_min','bigint',''
+'pernode_peak_mem_max','bigint',''
+'pernode_peak_mem_mean','bigint',''
+'sql','string',''
+'plan','string',''
+'tables_queried','string',''
+'select_columns','string',''
+'where_columns','string',''
+'join_columns','string',''
+'aggregate_columns','string',''
+'orderby_columns','string',''
+'coordinator_slots','bigint',''
+'executor_slots','bigint',''
+---- RESULTS: VERIFY_IS_SUBSET
+'','schema_version      ','1.0.0               '
+'','wm_schema_version   ','1.2.0               '
+---- TYPES
+string,string,string
+====
\ No newline at end of file
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_log-v1.2.0.test
 
b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_log-v1.2.0.test
new file mode 100644
index 000000000..eaa5e8e40
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/workload-mgmt-impala_query_log-v1.2.0.test
@@ -0,0 +1,66 @@
+====
+---- QUERY
+describe formatted sys.impala_query_log
+---- RESULTS
+'cluster_id','string',NULL
+'query_id','string',NULL
+'session_id','string',NULL
+'session_type','string',NULL
+'hiveserver2_protocol_version','string',NULL
+'db_user','string',NULL
+'db_user_connection','string',NULL
+'db_name','string',NULL
+'impala_coordinator','string',NULL
+'query_status','string',NULL
+'query_state','string',NULL
+'impala_query_end_state','string',NULL
+'query_type','string',NULL
+'network_address','string',NULL
+'start_time_utc','timestamp',NULL
+'total_time_ms','decimal(18,3)',NULL
+'query_opts_config','string',NULL
+'resource_pool','string',NULL
+'per_host_mem_estimate','bigint',NULL
+'dedicated_coord_mem_estimate','bigint',NULL
+'per_host_fragment_instances','string',NULL
+'backends_count','int',NULL
+'admission_result','string',NULL
+'cluster_memory_admitted','bigint',NULL
+'executor_group','string',NULL
+'executor_groups','string',NULL
+'exec_summary','string',NULL
+'num_rows_fetched','bigint',NULL
+'row_materialization_rows_per_sec','bigint',NULL
+'row_materialization_time_ms','decimal(18,3)',NULL
+'compressed_bytes_spilled','bigint',NULL
+'event_planning_finished','decimal(18,3)',NULL
+'event_submit_for_admission','decimal(18,3)',NULL
+'event_completed_admission','decimal(18,3)',NULL
+'event_all_backends_started','decimal(18,3)',NULL
+'event_rows_available','decimal(18,3)',NULL
+'event_first_row_fetched','decimal(18,3)',NULL
+'event_last_row_fetched','decimal(18,3)',NULL
+'event_unregister_query','decimal(18,3)',NULL
+'read_io_wait_total_ms','decimal(18,3)',NULL
+'read_io_wait_mean_ms','decimal(18,3)',NULL
+'bytes_read_cache_total','bigint',NULL
+'bytes_read_total','bigint',NULL
+'pernode_peak_mem_min','bigint',NULL
+'pernode_peak_mem_max','bigint',NULL
+'pernode_peak_mem_mean','bigint',NULL
+'sql','string',NULL
+'plan','string',NULL
+'tables_queried','string',NULL
+'select_columns','string',NULL
+'where_columns','string',NULL
+'join_columns','string',NULL
+'aggregate_columns','string',NULL
+'orderby_columns','string',NULL
+'coordinator_slots','bigint',NULL
+'executor_slots','bigint',NULL
+---- RESULTS: VERIFY_IS_SUBSET
+'','schema_version      ','1.0.0               '
+'','wm_schema_version   ','1.2.0               '
+---- TYPES
+string,string,string
+====
diff --git a/tests/custom_cluster/test_query_live.py 
b/tests/custom_cluster/test_query_live.py
index be16ea512..d5a6f169f 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -49,7 +49,7 @@ class TestQueryLive(CustomClusterTestSuite):
     # Alter can add additional event fields. Filter them out.
     describe_ext_data = [
         line for line in describe_ext_result.data if 'impala.events.catalog' 
not in line]
-    assert len(describe_ext_data) == 86
+    assert len(describe_ext_data) == 88
     system_table_re = re.compile(r'__IMPALA_SYSTEM_TABLE\s+true')
     assert list(filter(system_table_re.search, describe_ext_data))
     external_re = re.compile(r'EXTERNAL\s+TRUE')
@@ -139,7 +139,7 @@ class TestQueryLive(CustomClusterTestSuite):
 
     # describe query
     describe_result = self.execute_query('describe sys.impala_query_live')
-    assert len(describe_result.data) == 54
+    assert len(describe_result.data) == 56
     self.assert_describe_extended()
 
     # show create table
@@ -224,13 +224,13 @@ class TestQueryLive(CustomClusterTestSuite):
 
     try:
       describe_column = self.execute_query('describe sys.impala_query_live')
-      assert len(describe_column.data) == 55
+      assert len(describe_column.data) == 57
       assert column_desc in describe_column.data
 
       select_column = self.execute_query(
           'select test_alter from sys.impala_query_live limit 1')
       assert select_column.data == ['NULL']
-      self.assert_impalad_log_contains('WARNING', r'Unknown column \(position 
54\)'
+      self.assert_impalad_log_contains('WARNING', r'Unknown column \(position 
56\)'
           + ' added to table IMPALA_QUERY_LIVE; check if a coordinator was 
upgraded')
     finally:
       # Ensure new column is dropped in case of test failure
@@ -240,7 +240,7 @@ class TestQueryLive(CustomClusterTestSuite):
     assert drop_column.data == ['Column has been dropped.']
 
     describe_column2 = self.execute_query('describe sys.impala_query_live')
-    assert len(describe_column2.data) == 54
+    assert len(describe_column2.data) == 56
     assert column_desc not in describe_column2.data
 
     select_column2 = self.execute_query('select * from sys.impala_query_live')
diff --git a/tests/custom_cluster/test_workload_mgmt_init.py 
b/tests/custom_cluster/test_workload_mgmt_init.py
index 7918592c1..4b6d64e04 100644
--- a/tests/custom_cluster/test_workload_mgmt_init.py
+++ b/tests/custom_cluster/test_workload_mgmt_init.py
@@ -40,6 +40,8 @@ class TestWorkloadManagementInitBase(CustomClusterTestSuite):
   QUERY_TBL_LIVE_NAME = "impala_query_live"
   QUERY_TBL_LIVE = "{0}.{1}".format(WM_DB, QUERY_TBL_LIVE_NAME)
 
+  LATEST_SCHEMA = "1.2.0"
+
   @classmethod
   def get_workload(self):
     return 'functional-query'
@@ -146,9 +148,9 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
       disable_log_buffering=True)
   def test_no_upgrade(self, vector):
     """Tests that no upgrade happens when starting a cluster where the 
workload management
-       tables are already at version 1.1.0."""
-    self.restart_cluster(vector, schema_version="1.1.0", log_symlinks=True)
-    self.check_schema("1.1.0", vector)
+       tables are already at the latest version."""
+    self.restart_cluster(vector, schema_version=self.LATEST_SCHEMA, 
log_symlinks=True)
+    self.check_schema(self.LATEST_SCHEMA, vector)
 
     self.assert_catalogd_log_contains("INFO", r"Workload management table .*? 
will be "
         r"upgraded", expected_count=0)
@@ -166,14 +168,25 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
 
   @CustomClusterTestSuite.with_args(cluster_size=10, 
disable_log_buffering=True,
       log_symlinks=True,
-      impalad_args="--enable_workload_mgmt",
+      impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.1.0",
       catalogd_args="--enable_workload_mgmt "
+                    "--workload_mgmt_schema_version=1.1.0 "
                     
"--workload_mgmt_drop_tables=impala_query_log,impala_query_live")
   def test_create_on_version_1_1_0(self, vector):
     """Asserts that workload management tables are properly created on version 
1.1.0 using
        a 10 node cluster when no tables exist."""
     self.check_schema("1.1.0", vector, multiple_impalad=True)
 
+  @CustomClusterTestSuite.with_args(cluster_size=10, 
disable_log_buffering=True,
+      log_symlinks=True,
+      impalad_args="--enable_workload_mgmt",
+      catalogd_args="--enable_workload_mgmt "
+                    
"--workload_mgmt_drop_tables=impala_query_log,impala_query_live")
+  def test_create_on_version_1_2_0(self, vector):
+    """Asserts that workload management tables are properly created on the 
latest version
+       using a 10 node cluster when no tables exist."""
+    self.check_schema("1.2.0", vector, multiple_impalad=True)
+
   @CustomClusterTestSuite.with_args(cluster_size=1,
       impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.0.0",
       catalogd_args="--enable_workload_mgmt "
@@ -184,7 +197,7 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
     """Asserts that an upgrade from version 1.0.0 to 1.1.0 succeeds when 
starting with no
        existing workload management tables."""
 
-    # Veriy the initial table create on version 1.0.0 succeeded.
+    # Verify the initial table create on version 1.0.0 succeeded.
     self.check_schema("1.0.0", vector)
     self.assert_log_contains("catalogd", "WARNING", r"Target schema version 
'1.0.0' is "
         r"not the latest schema version '\d+\.\d+\.\d+'")
@@ -198,6 +211,54 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
 
     self.check_schema("1.1.0", vector)
 
+  @CustomClusterTestSuite.with_args(cluster_size=1,
+      impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.1.0",
+      catalogd_args="--enable_workload_mgmt "
+                    "--workload_mgmt_schema_version=1.1.0 "
+                    
"--workload_mgmt_drop_tables=impala_query_log,impala_query_live",
+      disable_log_buffering=True)
+  def test_upgrade_1_1_0_to_1_2_0(self, vector):
+    """Asserts that an upgrade from version 1.1.0 to 1.2.0 succeeds when 
starting with no
+       existing workload management tables."""
+
+    # Verify the initial table create on version 1.0.0 succeeded.
+    self.check_schema("1.1.0", vector)
+    self.assert_log_contains("catalogd", "WARNING", r"Target schema version 
'1.1.0' is "
+        r"not the latest schema version '\d+\.\d+\.\d+'")
+
+    self.restart_cluster(vector, schema_version="1.2.0", cluster_size=1,
+        log_symlinks=True)
+
+    # Assert the upgrade process ran.
+    self.assert_catalogd_all_tables(r"Workload management table '{}' is at 
version "
+        r"'1.1.0' and will be upgraded")
+
+    self.check_schema("1.2.0", vector)
+
+  @CustomClusterTestSuite.with_args(cluster_size=1,
+      impalad_args="--enable_workload_mgmt 
--workload_mgmt_schema_version=1.0.0",
+      catalogd_args="--enable_workload_mgmt "
+                    "--workload_mgmt_schema_version=1.0.0 "
+                    
"--workload_mgmt_drop_tables=impala_query_log,impala_query_live",
+      disable_log_buffering=True)
+  def test_upgrade_1_0_0_to_1_2_0(self, vector):
+    """Asserts that an upgrade from version 1.0.0 to 1.2.0 succeeds when 
starting with no
+       existing workload management tables."""
+
+    # Verify the initial table create on version 1.0.0 succeeded.
+    self.check_schema("1.0.0", vector)
+    self.assert_log_contains("catalogd", "WARNING", r"Target schema version 
'1.0.0' is "
+        r"not the latest schema version '\d+\.\d+\.\d+'")
+
+    self.restart_cluster(vector, schema_version="1.2.0", cluster_size=1,
+        log_symlinks=True)
+
+    # Assert the upgrade process ran.
+    self.assert_catalogd_all_tables(r"Workload management table '{}' is at 
version "
+        r"'1.0.0' and will be upgraded")
+
+    self.check_schema("1.2.0", vector)
+
   @CustomClusterTestSuite.with_args(cluster_size=1, 
impalad_args="--enable_workload_mgmt",
       catalogd_args="--enable_workload_mgmt", disable_log_buffering=True)
   def test_log_table_newer_schema_version(self, vector):
@@ -208,16 +269,16 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
         log_symlinks=True, 
additional_impalad_opts="--query_log_write_interval_s=15")
 
     self.assert_catalogd_log_contains("WARNING", "Target schema version 
'1.0.0' is not "
-        "the latest schema version '1.1.0'")
+        "the latest schema version '{}'".format(self.LATEST_SCHEMA))
 
-    # The workload management tables will be on schema version 1.1.0.
-    self.check_schema("1.1.0", vector)
+    # The workload management tables will be on the latest schema version.
+    self.check_schema(self.LATEST_SCHEMA, vector)
 
     # The workload management processing will be running on schema version 
1.0.0.
     self.assert_catalogd_all_tables(r"Target schema version '1.0.0' of the 
'{}' table is "
         r"lower than the actual schema version")
 
-    # Run a query and ensure it does not populate version 1.1.0 fields.
+    # Run a query and ensure it does not populate fields other than version 
1.0.0 fields.
     res = self.client.execute("select * from functional.alltypes")
     assert res.success
 
@@ -230,7 +291,9 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
           TQueryTableColumn.WHERE_COLUMNS: "",
           TQueryTableColumn.JOIN_COLUMNS: "",
           TQueryTableColumn.AGGREGATE_COLUMNS: "",
-          TQueryTableColumn.ORDERBY_COLUMNS: ""})
+          TQueryTableColumn.ORDERBY_COLUMNS: "",
+          TQueryTableColumn.COORDINATOR_SLOTS: "0",
+          TQueryTableColumn.EXECUTOR_SLOTS: "0"})
 
     # Check the query log table.
     impalad.service.wait_for_metric_value(
@@ -241,7 +304,9 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
           TQueryTableColumn.WHERE_COLUMNS: "NULL",
           TQueryTableColumn.JOIN_COLUMNS: "NULL",
           TQueryTableColumn.AGGREGATE_COLUMNS: "NULL",
-          TQueryTableColumn.ORDERBY_COLUMNS: "NULL"})
+          TQueryTableColumn.ORDERBY_COLUMNS: "NULL",
+          TQueryTableColumn.COORDINATOR_SLOTS: "NULL",
+          TQueryTableColumn.EXECUTOR_SLOTS: "NULL"})
 
   @CustomClusterTestSuite.with_args(cluster_size=1, disable_log_buffering=True,
       log_symlinks=True,
@@ -268,7 +333,7 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
         .format(self.WM_DB)).success
 
     self.restart_cluster(vector, log_symlinks=True)
-    self.check_schema("1.1.0", vector)
+    self.check_schema(self.LATEST_SCHEMA, vector)
 
   def _run_invalid_table_prop_test(self, table, prop_name, vector, 
expect_success=False):
     """Runs a test where one of the workload management schema version table 
properties on
@@ -360,9 +425,9 @@ class 
TestWorkloadManagementInitWait(TestWorkloadManagementInitBase):
 
     self.restart_cluster(vector, cluster_size=1, log_symlinks=True,
         additional_impalad_opts="--query_log_write_interval_s=30")
-    self.check_schema("1.1.0", vector)
+    self.check_schema(self.LATEST_SCHEMA, vector)
 
-    # Run a query and ensure it does not populate version 1.1.0 fields.
+    # Run a query and ensure it does not populate fields from the latest 
schema.
     res = self.client.execute("select * from functional.alltypes")
     assert res.success
 
@@ -507,11 +572,11 @@ class 
TestWorkloadManagementInitNoWait(TestWorkloadManagementInitBase):
        Cluster sizes of 1 are used to speed up the initial setup."""
     self.wait_for_log_exists("impalad", "FATAL")
     self.assert_impalad_log_contains("FATAL", r"Workload management schema 
version "
-        r"'0.0.1' is not one of the known versions: '1.0.0', '1.1.0'$")
+        r"'0.0.1' is not one of the known versions: '1.0.0', '1.1.0', 
'1.2.0'$")
 
     self.wait_for_log_exists("catalogd", "FATAL")
     self.assert_catalogd_log_contains("FATAL", r"Workload management schema 
version "
-        r"'0.0.1' is not one of the known versions: '1.0.0', '1.1.0'$")
+        r"'0.0.1' is not one of the known versions: '1.0.0', '1.1.0', 
'1.2.0'$")
 
   @CustomClusterTestSuite.with_args(start_args="--enable_catalogd_ha",
       statestored_args="--use_subscriber_id_as_catalogd_priority=true",
diff --git a/tests/util/workload_management.py 
b/tests/util/workload_management.py
index f5a450c59..1e3cae8d9 100644
--- a/tests/util/workload_management.py
+++ b/tests/util/workload_management.py
@@ -608,9 +608,23 @@ def assert_query(query_tbl, client, 
expected_cluster_id="", raw_profile=None,
   # OrderBy Columns
   assert_col(TQueryTableColumn.ORDERBY_COLUMNS, r'\n\s+OrderBy 
Columns:\s+(.*?)\n')
 
-  # Assert all entries have been tested and added to ret_data
-  for i in range(len(TQueryTableColumn._VALUES_TO_NAMES)):
-    assert TQueryTableColumn._VALUES_TO_NAMES[i] in ret_data
+  # Coordinator and Executor Slots Columns
+  admission_slots = re.findall(
+    r'\n\s+\-\s+AdmissionSlots:\s+(\d*?)\s+.*?\n', profile_text)
+  value = column_val(TQueryTableColumn.COORDINATOR_SLOTS)
+  if TQueryTableColumn.COORDINATOR_SLOTS in expected_overrides:
+    assert value == expected_overrides[TQueryTableColumn.COORDINATOR_SLOTS]
+  else:
+    # The first host has the coordinator admission slots.
+    expected_coordinator_slots = admission_slots[0] if len(admission_slots) > 
0 else "0"
+    assert value == expected_coordinator_slots
+  value = column_val(TQueryTableColumn.EXECUTOR_SLOTS)
+  if TQueryTableColumn.EXECUTOR_SLOTS in expected_overrides:
+    assert value == expected_overrides[TQueryTableColumn.EXECUTOR_SLOTS]
+  else:
+    # Take executor admission slots from the second impalad.
+    expected_executor_slots = admission_slots[1] if len(admission_slots) > 1 
else "0"
+    assert value == expected_executor_slots
 
   return ret_data
 # function assert_query
diff --git a/www/queries.tmpl b/www/queries.tmpl
index 124b5fefc..49c0ad458 100644
--- a/www/queries.tmpl
+++ b/www/queries.tmpl
@@ -72,6 +72,8 @@ command line parameter.</p>
     <th title="{{tips_last_event}}">Last Event</th>
     <th title="{{tips_rows_fetched}}"># rows fetched</th>
     <th title="{{tips_resource_pool}}">Resource Pool</th>
+    <th title="{{tips_coordinator_slots}}">Coordinator Slots</th>
+    <th title="{{tips_executor_slots}}">Executor Slots</th>
     <th title="{{tips_statement}}">Statement</th>
   </tr>
 {{! filter to get just executing queries from in_flight_queries}}
@@ -93,6 +95,8 @@ command line parameter.</p>
     <td><samp>{{last_event}}</samp></td>
     <td>{{rows_fetched}}</td>
     <td>{{resource_pool}}</td>
+    <td>{{coordinator_slots}}</td>
+    <td>{{executor_slots}}</td>
     <td><samp>{{stmt}}</samp></td>
   </tr>
 {{/executing}}
@@ -142,6 +146,8 @@ command line parameter.</p>
     <th title="{{tips_last_event}}">Last Event</th>
     <th title="{{tips_rows_fetched}}"># rows fetched</th>
     <th title="{{tips_resource_pool}}">Resource Pool</th>
+    <th title="{{tips_coordinator_slots}}">Coordinator Slots</th>
+    <th title="{{tips_executor_slots}}">Executor Slots</th>
     <th title="{{tips_statement}}">Statement</th>
   </tr>
 {{! filter to get just waiting queries from in_flight_queries}}
@@ -164,6 +170,8 @@ command line parameter.</p>
     <td><samp>{{last_event}}</samp></td>
     <td>{{rows_fetched}}</td>
     <td>{{resource_pool}}</td>
+    <td>{{coordinator_slots}}</td>
+    <td>{{executor_slots}}</td>
     <td><samp>{{stmt}}</samp></td>
   </tr>
 {{/waiting}}
@@ -208,6 +216,8 @@ command line parameter.</p>
     <th title="{{tips_state}}">State</th>
     <th title="{{tips_rows_fetched}}"># rows fetched</th>
     <th title="{{tips_resource_pool}}">Resource Pool</th>
+    <th title="{{tips_coordinator_slots}}">Coordinator Slots</th>
+    <th title="{{tips_executor_slots}}">Executor Slots</th>
     <th title="{{tips_statement}}">Statement</th>
   </tr>
 {{#completed_queries}}
@@ -226,6 +236,8 @@ command line parameter.</p>
     <td><samp>{{state}}</samp></td>
     <td>{{rows_fetched}}</td>
     <td>{{resource_pool}}</td>
+    <td>{{coordinator_slots}}</td>
+    <td>{{executor_slots}}</td>
     <td><samp>{{stmt}}</samp></td>
   </tr>
 {{/completed_queries}}
@@ -258,6 +270,8 @@ command line parameter.</p>
     <th title="{{tips_state}}">State</th>
     <th title="{{tips_rows_fetched}}"># rows fetched</th>
     <th title="{{tips_resource_pool}}">Resource Pool</th>
+    <th title="{{tips_coordinator_slots}}">Coordinator Slots</th>
+    <th title="{{tips_executor_slots}}">Executor Slots</th>
     <th title="{{tips_statement}}">Statement</th>
     <th>Delete</th>
   </tr>
@@ -407,6 +421,8 @@ command line parameter.</p>
         insertRowVal(row, query.state);
         insertRowVal(row, query.rows_fetched);
         insertRowVal(row, query.resource_pool);
+        insertRowVal(row, query.coordinator_slots);
+        insertRowVal(row, query.executor_slots);
         insertRowVal(row, query.statement);
         var deleteButton = document.createElement("input");
         deleteButton.type = "button";
diff --git a/www/query_detail_tabs.tmpl b/www/query_detail_tabs.tmpl
index 5f00d85c9..9850d69fe 100644
--- a/www/query_detail_tabs.tmpl
+++ b/www/query_detail_tabs.tmpl
@@ -55,6 +55,8 @@ under the License.
     {{/inflight}}
     <th title="{{tips_rows_fetched}}"># rows fetched</th>
     <th title="{{tips_resource_pool}}">Resource Pool</th>
+    <th title="{{tips_coordinator_slots}}">Coordinator Slots</th>
+    <th title="{{tips_executor_slots}}">Executor Slots</th>
   </tr>
 
   <tr>
@@ -86,6 +88,8 @@ under the License.
     {{/inflight}}
     <td>{{rows_fetched}}</td>
     <td>{{resource_pool}}</td>
+    <td>{{coordinator_slots}}</td>
+    <td>{{executor_slots}}</td>
   </tr>
 </table>
 {{/record_json}}


Reply via email to