This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 45995e6892720fce476f1c9df53941ed870d1a04 Author: Michael Smith <[email protected]> AuthorDate: Wed Dec 6 13:39:41 2023 -0800 IMPALA-12540: Query Live Table Defines SystemTable which are in-memory tables that can provide access to Impala state. Adds the 'impala_query_live' to the database 'sys', which already exists for 'sys.impala_query_log'. Implements the 'impala_query_live' table to view active queries across all coordinators sharing the same statestore. SystemTables create new SystemTableScanNodes for their scan node implementation. When computing scan range locations, SystemTableScanNodes creates a scan range for each in the cluster (identified via ClusterMembershipMgr). This produces a plan that looks like: Query: explain select * from sys.impala_query_live +------------------------------------------------------------+ | Explain String | +------------------------------------------------------------+ | Max Per-Host Resource Reservation: Memory=4.00MB Threads=2 | | Per-Host Resource Estimates: Memory=11MB | | WARNING: The following tables are missing relevant table | | and/or column statistics. | | sys.impala_query_live | | | | PLAN-ROOT SINK | | | | | 01:EXCHANGE [UNPARTITIONED] | | | | | 00:SCAN SYSTEM_TABLE [sys.impala_query_live] | | row-size=72B cardinality=20 | +------------------------------------------------------------+ Impala's scheduler checks for whether the query contains fragments that can be scheduled on coordinators, and if present includes an ExecutorGroup containing all coordinators. These are used to schedule scan ranges that are flagged as 'use_coordinator', allowing SystemTableScanNodes to be scheduled on dedicated coordinators and outside the selected executor group. Execution will pull data from ImpalaServer on the backend via a SystemTableScanner implementation based on table name. In the query profile, SYSTEM_TABLE_SCAN_NODE includes ActiveQueryCollectionTime and PendingQueryCollectionTime to track time spent collecting QueryState from ImpalaServer. Grants QueryScanner private access to ImpalaServer, identical to how ImpalaHttpHandler access internal server state. Adds custom cluster tests for impala_query_live, and unit tests for changes to planner and scheduler. Change-Id: Ie2f9a449f0e5502078931e7f1c5df6e0b762c743 Reviewed-on: http://gerrit.cloudera.org:8080/20762 Reviewed-by: Jason Fehr <[email protected]> Reviewed-by: Riza Suminto <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/CMakeLists.txt | 2 + be/src/exec/exec-node.cc | 1 + be/src/exec/scan-node.cc | 4 + be/src/exec/system-table-scan-node.cc | 100 ++++++ be/src/exec/system-table-scan-node.h | 60 ++++ be/src/exec/system-table-scanner.cc | 348 +++++++++++++++++++++ be/src/exec/system-table-scanner.h | 83 +++++ be/src/runtime/descriptors.cc | 13 + be/src/runtime/descriptors.h | 10 + be/src/scheduling/admission-controller.cc | 6 +- be/src/scheduling/admission-controller.h | 2 + be/src/scheduling/cluster-membership-mgr-test.cc | 45 +++ be/src/scheduling/cluster-membership-mgr.cc | 27 ++ be/src/scheduling/cluster-membership-mgr.h | 4 + be/src/scheduling/scheduler-test-util.cc | 47 ++- be/src/scheduling/scheduler-test-util.h | 10 +- be/src/scheduling/scheduler-test.cc | 19 ++ be/src/scheduling/scheduler.cc | 80 ++++- be/src/scheduling/scheduler.h | 5 +- be/src/service/fe-support.cc | 38 +++ be/src/service/frontend.h | 2 +- be/src/service/impala-server.cc | 1 + be/src/service/impala-server.h | 12 + be/src/service/query-state-record.h | 2 +- be/src/service/workload-management-fields.cc | 1 + be/src/service/workload-management.cc | 16 + be/src/util/sharded-query-map-util.h | 10 + common/protobuf/planner.proto | 1 + common/thrift/CMakeLists.txt | 1 + common/thrift/CatalogObjects.thrift | 16 + common/thrift/Descriptors.thrift | 1 + common/thrift/PlanNodes.thrift | 9 + common/thrift/Query.thrift | 3 + common/thrift/SystemTables.thrift | 71 +++++ common/thrift/Types.thrift | 5 + .../java/org/apache/impala/analysis/Analyzer.java | 15 +- .../apache/impala/analysis/DescribeTableStmt.java | 9 + .../impala/analysis/ShowCreateTableStmt.java | 3 + .../authorization/BaseAuthorizationChecker.java | 4 + .../impala/catalog/CatalogServiceCatalog.java | 7 +- fe/src/main/java/org/apache/impala/catalog/Db.java | 11 + .../org/apache/impala/catalog/SystemTable.java | 217 +++++++++++++ .../apache/impala/planner/SingleNodePlanner.java | 8 +- .../apache/impala/planner/SystemTableScanNode.java | 150 +++++++++ .../org/apache/impala/service/BackendConfig.java | 5 + .../java/org/apache/impala/service/FeSupport.java | 24 ++ .../java/org/apache/impala/service/Frontend.java | 7 + .../apache/impala/util/CatalogBlacklistUtils.java | 5 +- .../org/apache/impala/catalog/SystemTableTest.java | 38 +++ .../org/apache/impala/planner/PlannerTest.java | 13 + .../org/apache/impala/planner/PlannerTestBase.java | 5 +- .../queries/PlannerTest/impala-query-live.test | 34 ++ tests/custom_cluster/test_query_live.py | 331 ++++++++++++++++++++ tests/util/workload_management.py | 3 +- 54 files changed, 1909 insertions(+), 35 deletions(-) diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index c98b8f70a..63c9ac595 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -101,6 +101,8 @@ add_library(Exec sort-node.cc streaming-aggregation-node.cc subplan-node.cc + system-table-scan-node.cc + system-table-scanner.cc table-sink-base.cc text-converter.cc topn-node.cc diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 8130378ac..8b38e67c0 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -176,6 +176,7 @@ Status PlanNode::CreatePlanNode( case TPlanNodeType::HBASE_SCAN_NODE: case TPlanNodeType::DATA_SOURCE_NODE: case TPlanNodeType::KUDU_SCAN_NODE: + case TPlanNodeType::SYSTEM_TABLE_SCAN_NODE: *node = pool->Add(new ScanPlanNode()); break; case TPlanNodeType::AGGREGATION_NODE: diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc index 3b7935f20..27bd0445a 100644 --- a/be/src/exec/scan-node.cc +++ b/be/src/exec/scan-node.cc @@ -24,6 +24,7 @@ #include "exec/hbase/hbase-scan-node.h" #include "exec/kudu/kudu-scan-node-mt.h" #include "exec/kudu/kudu-scan-node.h" +#include "exec/system-table-scan-node.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" #include "runtime/blocking-row-batch-queue.h" @@ -135,6 +136,9 @@ Status ScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const *node = pool->Add(new KuduScanNode(pool, *this, state->desc_tbl())); } break; + case TPlanNodeType::SYSTEM_TABLE_SCAN_NODE: + *node = pool->Add(new SystemTableScanNode(pool, *this, state->desc_tbl())); + break; default: DCHECK(false) << "Unexpected scan node type: " << tnode_->node_type; } diff --git a/be/src/exec/system-table-scan-node.cc b/be/src/exec/system-table-scan-node.cc new file mode 100644 index 000000000..8cf7748d1 --- /dev/null +++ b/be/src/exec/system-table-scan-node.cc @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "system-table-scan-node.h" + +#include "exec/exec-node.inline.h" +#include "exec/system-table-scanner.h" +#include "runtime/descriptors.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "runtime/tuple-row.h" + +#include "common/names.h" + +namespace impala { + +Status SystemTableScanNode::Prepare(RuntimeState* state) { + RETURN_IF_ERROR(ScanNode::Prepare(state)); + tuple_desc_ = state->desc_tbl().GetTupleDescriptor( + plan_node().tnode_->system_table_scan_node.tuple_id); + DCHECK(tuple_desc_ != nullptr); + return Status::OK(); +} + +Status SystemTableScanNode::Open(RuntimeState* state) { + RETURN_IF_ERROR(ScanNode::Open(state)); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + SCOPED_TIMER(runtime_profile_->total_time_counter()); + + RETURN_IF_ERROR(SystemTableScanner::CreateScanner(state, runtime_profile(), + plan_node().tnode_->system_table_scan_node.table_name, &scanner_)); + RETURN_IF_ERROR(scanner_->Open()); + return Status::OK(); +} + +Status SystemTableScanNode::MaterializeNextTuple(MemPool* tuple_pool, Tuple* tuple) { + tuple->Init(tuple_desc_->byte_size()); + RETURN_IF_ERROR(scanner_->MaterializeNextTuple(tuple_pool, tuple, tuple_desc_)); + return Status::OK(); +} + +Status SystemTableScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { + RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + SCOPED_TIMER(runtime_profile_->total_time_counter()); + + int64_t tuple_buffer_size; + uint8_t* tuple_mem; + RETURN_IF_ERROR( + row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size, &tuple_mem)); + + SCOPED_TIMER(materialize_tuple_timer()); + + // copy rows until we hit the limit/capacity or until we exhaust input batch + while (!ReachedLimit() && !row_batch->AtCapacity() && !scanner_->eos()) { + Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem); + RETURN_IF_ERROR(MaterializeNextTuple(row_batch->tuple_data_pool(), tuple)); + TupleRow* tuple_row = row_batch->GetRow(row_batch->AddRow()); + tuple_row->SetTuple(0, tuple); + + if (ExecNode::EvalConjuncts( + conjunct_evals_.data(), conjunct_evals_.size(), tuple_row)) { + row_batch->CommitLastRow(); + tuple_mem += tuple_desc_->byte_size(); + IncrementNumRowsReturned(1); + } + } + COUNTER_SET(rows_returned_counter_, rows_returned()); + *eos = ReachedLimit() || scanner_->eos(); + return Status::OK(); +} + +Status SystemTableScanNode::Reset(RuntimeState* state, RowBatch* row_batch) { + DCHECK(false) << "NYI"; + return Status("NYI"); +} + +void SystemTableScanNode::Close(RuntimeState* state) { + if (is_closed()) return; + scanner_.reset(); + ScanNode::Close(state); +} + +} // namespace impala diff --git a/be/src/exec/system-table-scan-node.h b/be/src/exec/system-table-scan-node.h new file mode 100644 index 000000000..6a633b612 --- /dev/null +++ b/be/src/exec/system-table-scan-node.h @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/scan-node.h" +#include "exec/system-table-scanner.h" + +namespace impala { + +class SystemTableScanNode : public ScanNode { + /// A scan node that exposes Impala system state as a table. + /// + /// Different SystemTableScanner subclasses gather data from different Impala subsystems + /// and materialize it into Tuples. + /// e.g. QueryScanner materializes active queries into tuples. + public: + using ScanNode::ScanNode; + + /// Create schema and columns to slots mapping. + Status Prepare(RuntimeState* state) override; + + /// Start scan. + Status Open(RuntimeState* state) override; + + /// Fill the next row batch by fetching more data from the scanner. + Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + + /// NYI + Status Reset(RuntimeState* state, RowBatch* row_batch) override; + + /// Close after scan is finished + void Close(RuntimeState* state) override; + + private: + // Used to scan gather data of system table from ExecEnv + std::unique_ptr<SystemTableScanner> scanner_; + + /// Descriptor of tuples read from SystemTable + const TupleDescriptor* tuple_desc_ = nullptr; + + /// Materializes the next row into tuple. + Status MaterializeNextTuple(MemPool* mem_pool, Tuple* tuple); +}; + +} diff --git a/be/src/exec/system-table-scanner.cc b/be/src/exec/system-table-scanner.cc new file mode 100644 index 000000000..3d0dbad6b --- /dev/null +++ b/be/src/exec/system-table-scanner.cc @@ -0,0 +1,348 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "system-table-scanner.h" + +#include <memory> +#include <boost/algorithm/string.hpp> +#include <gutil/strings/substitute.h> + +#include "gen-cpp/SystemTables_types.h" +#include "runtime/exec-env.h" +#include "runtime/mem-pool.h" +#include "runtime/mem-tracker.h" +#include "runtime/query-driver.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "runtime/timestamp-value.h" +#include "runtime/timestamp-value.inline.h" +#include "runtime/tuple-row.h" +#include "service/query-state-record.h" +#include "util/debug-util.h" +#include "common/names.h" + +using namespace boost::algorithm; +using strings::Substitute; + +DECLARE_string(cluster_id); + +namespace impala { + +static const string ERROR_MEM_LIMIT_EXCEEDED = + "SystemTableScanNode::$0() failed to allocate $1 bytes."; + +Status SystemTableScanner::CreateScanner(RuntimeState* state, RuntimeProfile* profile, + TSystemTableName::type table_name, std::unique_ptr<SystemTableScanner>* scanner) { + switch (table_name) { + case TSystemTableName::QUERY_LIVE: + *scanner = make_unique<QueryScanner>(state, profile); + break; + default: + return Status(ErrorMsg(TErrorCode::NOT_IMPLEMENTED_ERROR, + Substitute("Unknown table name: $0", table_name))); + } + return Status::OK(); +} + +/// Write a string value to a STRING slot, allocating memory from 'pool'. Returns +/// an error if memory cannot be allocated without exceeding a memory limit. +Status SystemTableScanner::WriteStringSlot( + const char* data, int len, MemPool* pool, void* slot) { + char* buffer = reinterpret_cast<char*>(pool->TryAllocateUnaligned(len)); + if (UNLIKELY(buffer == nullptr)) { + string details = Substitute(ERROR_MEM_LIMIT_EXCEEDED, "WriteStringSlot", len); + return pool->mem_tracker()->MemLimitExceeded(state_, details, len); + } + memcpy(buffer, data, len); + reinterpret_cast<StringValue*>(slot)->Assign(buffer, len); + return Status::OK(); +} + +Status SystemTableScanner::WriteStringSlot(const string& str, MemPool* pool, void* slot) { + return WriteStringSlot(str.data(), str.size(), pool, slot); +} + +static void WriteUnixTimestampSlot(int64_t unix_time_micros, void* slot) { + *reinterpret_cast<TimestampValue*>(slot) = + TimestampValue::UtcFromUnixTimeMicros(unix_time_micros); +} + +static void WriteBigIntSlot(int64_t value, void* slot) { + *reinterpret_cast<int64_t*>(slot) = value; +} + +static void WriteIntSlot(int32_t value, void* slot) { + *reinterpret_cast<int32_t*>(slot) = value; +} + +QueryScanner::QueryScanner(RuntimeState* state, RuntimeProfile* profile) + : SystemTableScanner(state, profile), + active_query_collection_timer_(ADD_TIMER(profile_, "ActiveQueryCollectionTime")), + pending_query_collection_timer_(ADD_TIMER(profile_, "PendingQueryCollectionTime")) + {} + +Status QueryScanner::Open() { + ImpalaServer* server = ExecEnv::GetInstance()->impala_server(); + + // Get a sorted list of state snapshots for all active queries. This mimics the + // behavior of ImpalaHttpHandler::QueryStateHandler. Using snapshots avoids potential + // memory violations of trying to use a ClientRequestState pointer without holding a + // shared_ptr to the QueryDriver and avoids keeping the query open longer than + // necessary (via that shared_ptr). The cost is that we allocate memory for the + // QueryStateRecords, which should be relatively small. + { + SCOPED_TIMER(active_query_collection_timer_); + server->query_driver_map_.DoFuncForAllEntries( + [&](const std::shared_ptr<QueryDriver>& query_driver) { + query_records_.emplace_back(make_shared<QueryStateExpanded>( + *query_driver->GetActiveClientRequestState())); + }); + } + + unordered_set<TUniqueId> running_queries; + running_queries.reserve(query_records_.size()); + for (const auto& r : query_records_) { + running_queries.insert(r->base_state->id); + } + + // It's possible for a query to appear in both query_driver_map_ and completed_queries_ + // if it's been added to completed_queries_ in CloseClientRequestState and has not yet + // been removed from query_driver_map_ in QueryDriver::Unregister. Collection order + // ensures we don't miss one by collecting before it's been added to completed_queries_, + // then after it's added to completed_queries_ and removed from query_driver_map_. + // Avoid adding entries if they already exist. + { + SCOPED_TIMER(pending_query_collection_timer_); + for (const auto& r : server->GetCompletedQueries()) { + if (running_queries.find(r->base_state->id) == running_queries.end()) { + query_records_.emplace_back(r); + } + } + } + + if (query_records_.empty()) eos_ = true; + return Status::OK(); +} + +static void WriteEvent(const QueryStateExpanded& query, void* slot, QueryEvent name) { + const auto& event = query.events.find(name); + DCHECK(event != query.events.end()); + WriteBigIntSlot(event->second, slot); +} + +Status QueryScanner::MaterializeNextTuple( + MemPool* pool, Tuple* tuple, const TupleDescriptor* tuple_desc) { + DCHECK(!query_records_.empty()); + const QueryStateExpanded& query = *query_records_.front(); + const QueryStateRecord& record = *query.base_state; + ExecEnv* exec_env = ExecEnv::GetInstance(); + for (const SlotDescriptor* slot_desc : tuple_desc->slots()) { + void* slot = tuple->GetSlot(slot_desc->tuple_offset()); + + switch (slot_desc->col_pos()) { + case TQueryTableColumn::CLUSTER_ID: + RETURN_IF_ERROR(WriteStringSlot(FLAGS_cluster_id, pool, slot)); + break; + case TQueryTableColumn::QUERY_ID: + RETURN_IF_ERROR(WriteStringSlot(PrintId(record.id), pool, slot)); + break; + case TQueryTableColumn::SESSION_ID: + RETURN_IF_ERROR(WriteStringSlot(PrintId(query.session_id), pool, slot)); + break; + case TQueryTableColumn::SESSION_TYPE: + RETURN_IF_ERROR(WriteStringSlot(to_string(query.session_type), pool, slot)); + break; + case TQueryTableColumn::HIVESERVER2_PROTOCOL_VERSION: + if (query.session_type == TSessionType::HIVESERVER2) { + RETURN_IF_ERROR(WriteStringSlot( + Substitute("V$0", 1 + query.hiveserver2_protocol_version), pool, slot)); + } + break; + case TQueryTableColumn::DB_USER: + RETURN_IF_ERROR(WriteStringSlot(record.effective_user, pool, slot)); + break; + case TQueryTableColumn::DB_USER_CONNECTION: + RETURN_IF_ERROR(WriteStringSlot(query.db_user_connection, pool, slot)); + break; + case TQueryTableColumn::DB_NAME: + RETURN_IF_ERROR(WriteStringSlot(record.default_db, pool, slot)); + break; + case TQueryTableColumn::IMPALA_COORDINATOR: + RETURN_IF_ERROR(WriteStringSlot( + TNetworkAddressToString(exec_env->configured_backend_address()), pool, slot)); + break; + case TQueryTableColumn::QUERY_STATUS: + RETURN_IF_ERROR(WriteStringSlot(record.query_status.ok() ? + "OK" : record.query_status.msg().msg(), pool, slot)); + break; + case TQueryTableColumn::QUERY_STATE: + RETURN_IF_ERROR(WriteStringSlot(record.query_state, pool, slot)); + break; + case TQueryTableColumn::IMPALA_QUERY_END_STATE: + RETURN_IF_ERROR(WriteStringSlot(query.impala_query_end_state, pool, slot)); + break; + case TQueryTableColumn::QUERY_TYPE: + RETURN_IF_ERROR(WriteStringSlot(to_string(record.stmt_type), pool, slot)); + break; + case TQueryTableColumn::NETWORK_ADDRESS: + RETURN_IF_ERROR(WriteStringSlot( + TNetworkAddressToString(query.client_address), pool, slot)); + break; + case TQueryTableColumn::START_TIME_UTC: + WriteUnixTimestampSlot(record.start_time_us, slot); + break; + case TQueryTableColumn::TOTAL_TIME_NS: { + const int64_t end_time_us = + record.end_time_us > 0 ? record.end_time_us : UnixMicros(); + const int64_t duration_us = end_time_us - record.start_time_us; + WriteBigIntSlot(duration_us * 1000, slot); + break; + } + case TQueryTableColumn::QUERY_OPTS_CONFIG: + RETURN_IF_ERROR(WriteStringSlot( + DebugQueryOptions(query.query_options), pool, slot)); + break; + case TQueryTableColumn::RESOURCE_POOL: + RETURN_IF_ERROR(WriteStringSlot(record.resource_pool, pool, slot)); + break; + case TQueryTableColumn::PER_HOST_MEM_ESTIMATE: + WriteBigIntSlot(query.per_host_mem_estimate, slot); + break; + case TQueryTableColumn::DEDICATED_COORD_MEM_ESTIMATE: + WriteBigIntSlot(query.dedicated_coord_mem_estimate, slot); + break; + case TQueryTableColumn::PER_HOST_FRAGMENT_INSTANCES: + if (!query.per_host_state.empty()) { + stringstream ss; + for (const auto& state : query.per_host_state) { + ss << TNetworkAddressToString(state.first) << "=" + << state.second.fragment_instance_count << ","; + } + string s = ss.str(); + s.pop_back(); + RETURN_IF_ERROR(WriteStringSlot(s, pool, slot)); + } + break; + case TQueryTableColumn::BACKENDS_COUNT: + DCHECK_LE(query.per_host_state.size(), numeric_limits<int32_t>::max()); + WriteIntSlot(query.per_host_state.size(), slot); + break; + case TQueryTableColumn::ADMISSION_RESULT: + RETURN_IF_ERROR(WriteStringSlot(query.admission_result, pool, slot)); + break; + case TQueryTableColumn::CLUSTER_MEMORY_ADMITTED: + WriteBigIntSlot(record.cluster_mem_est, slot); + break; + case TQueryTableColumn::EXECUTOR_GROUP: + RETURN_IF_ERROR(WriteStringSlot(query.executor_group, pool, slot)); + break; + case TQueryTableColumn::EXECUTOR_GROUPS: + RETURN_IF_ERROR(WriteStringSlot(query.executor_groups, pool, slot)); + break; + case TQueryTableColumn::EXEC_SUMMARY: + RETURN_IF_ERROR(WriteStringSlot(query.exec_summary, pool, slot)); + break; + case TQueryTableColumn::NUM_ROWS_FETCHED: + WriteBigIntSlot(record.num_rows_fetched, slot); + break; + case TQueryTableColumn::ROW_MATERIALIZATION_ROWS_PER_SEC: + WriteBigIntSlot(query.row_materialization_rate, slot); + break; + case TQueryTableColumn::ROW_MATERIALIZATION_TIME_NS: + WriteBigIntSlot(query.row_materialization_time, slot); + break; + case TQueryTableColumn::COMPRESSED_BYTES_SPILLED: + WriteBigIntSlot(query.compressed_bytes_spilled, slot); + break; + case TQueryTableColumn::EVENT_PLANNING_FINISHED: + WriteEvent(query, slot, QueryEvent::PLANNING_FINISHED); + break; + case TQueryTableColumn::EVENT_SUBMIT_FOR_ADMISSION: + WriteEvent(query, slot, QueryEvent::SUBMIT_FOR_ADMISSION); + break; + case TQueryTableColumn::EVENT_COMPLETED_ADMISSION: + WriteEvent(query, slot, QueryEvent::COMPLETED_ADMISSION); + break; + case TQueryTableColumn::EVENT_ALL_BACKENDS_STARTED: + WriteEvent(query, slot, QueryEvent::ALL_BACKENDS_STARTED); + break; + case TQueryTableColumn::EVENT_ROWS_AVAILABLE: + WriteEvent(query, slot, QueryEvent::ROWS_AVAILABLE); + break; + case TQueryTableColumn::EVENT_FIRST_ROW_FETCHED: + WriteEvent(query, slot, QueryEvent::FIRST_ROW_FETCHED); + break; + case TQueryTableColumn::EVENT_LAST_ROW_FETCHED: + WriteEvent(query, slot, QueryEvent::LAST_ROW_FETCHED); + break; + case TQueryTableColumn::EVENT_UNREGISTER_QUERY: + WriteEvent(query, slot, QueryEvent::UNREGISTER_QUERY); + break; + case TQueryTableColumn::READ_IO_WAIT_TOTAL_NS: + WriteBigIntSlot(query.read_io_wait_time_total, slot); + break; + case TQueryTableColumn::READ_IO_WAIT_MEAN_NS: + WriteBigIntSlot(query.read_io_wait_time_mean, slot); + break; + case TQueryTableColumn::BYTES_READ_CACHE_TOTAL: + WriteBigIntSlot(query.bytes_read_cache_total, slot); + break; + case TQueryTableColumn::BYTES_READ_TOTAL: + WriteBigIntSlot(query.bytes_read_total, slot); + break; + case TQueryTableColumn::PERNODE_PEAK_MEM_MIN: + if (auto min_elem = min_element(query.per_host_state.cbegin(), + query.per_host_state.cend(), PerHostPeakMemoryComparator); + LIKELY(min_elem != query.per_host_state.cend())) { + WriteBigIntSlot(min_elem->second.peak_memory_usage, slot); + } + break; + case TQueryTableColumn::PERNODE_PEAK_MEM_MAX: + if (auto max_elem = max_element(query.per_host_state.cbegin(), + query.per_host_state.cend(), PerHostPeakMemoryComparator); + LIKELY(max_elem != query.per_host_state.cend())) { + WriteBigIntSlot(max_elem->second.peak_memory_usage, slot); + } + break; + case TQueryTableColumn::PERNODE_PEAK_MEM_MEAN: + if (LIKELY(!query.per_host_state.empty())) { + int64_t calc_mean = 0; + for (const auto& host : query.per_host_state) { + calc_mean += host.second.peak_memory_usage; + } + calc_mean /= query.per_host_state.size(); + WriteBigIntSlot(calc_mean, slot); + } + break; + case TQueryTableColumn::SQL: + RETURN_IF_ERROR(WriteStringSlot(record.stmt, pool, slot)); + break; + case TQueryTableColumn::PLAN: + RETURN_IF_ERROR(WriteStringSlot( + trim_left_copy_if(record.plan, is_any_of("\n")), pool, slot)); + break; + default: + DCHECK(false) << "Unknown column position " << slot_desc->col_pos(); + } + } + + query_records_.pop_front(); + if (query_records_.empty()) eos_ = true; + return Status::OK(); +} + +} /* namespace impala */ diff --git a/be/src/exec/system-table-scanner.h b/be/src/exec/system-table-scanner.h new file mode 100644 index 000000000..92fbd6a78 --- /dev/null +++ b/be/src/exec/system-table-scanner.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/scan-node.h" + +namespace impala { + +struct QueryStateExpanded; + +// SystemTableScanner is the generic interface for different implementations of +// system table scanning. +class SystemTableScanner { + public: + static Status CreateScanner(RuntimeState* state, RuntimeProfile* profile, + TSystemTableName::type table_name, std::unique_ptr<SystemTableScanner>* scanner); + + virtual ~SystemTableScanner() = default; + + /// Start scan, load data needed + virtual Status Open() = 0; + + /// Fill the next row batch by fetching more data from system table data source. + virtual Status MaterializeNextTuple( + MemPool* pool, Tuple* tuple, const TupleDescriptor* tuple_desc_) = 0; + + bool eos() const noexcept { return eos_; } + + protected: + SystemTableScanner(RuntimeState* state, RuntimeProfile* profile) + : state_(state), profile_(profile), eos_(false) {} + + /// Write a string value to a STRING slot, allocating memory from 'pool'. Returns + /// an error if memory cannot be allocated without exceeding a memory limit. + Status WriteStringSlot(const char* data, int len, MemPool* pool, void* slot); + Status WriteStringSlot(const std::string& str, MemPool* pool, void* slot); + + RuntimeState* const state_; + + RuntimeProfile* const profile_; + + /// if true, nothing left to return in getNext() in SystemTableScanNode + bool eos_; +}; + +class QueryScanner : public SystemTableScanner { + public: + QueryScanner(RuntimeState* state, RuntimeProfile* profile); + + /// Start scan, load list of query IDs into active_query_ids_. + virtual Status Open(); + + /// Fill the next row batch by fetching query state from ImpalaServer. + virtual Status MaterializeNextTuple( + MemPool* pool, Tuple* tuple, const TupleDescriptor* tuple_desc_); + + private: + /// Snapshot of query state for queries that are active during Open. + std::deque<std::shared_ptr<QueryStateExpanded>> query_records_; + + /// Time spent in Open collecting active query state. + RuntimeProfile::Counter* active_query_collection_timer_; + + /// Time spent in Open collecting completed but not yet written query state. + RuntimeProfile::Counter* pending_query_collection_timer_; +}; + +} /* namespace impala */ diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index aa716cca8..a1002b342 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -342,6 +342,16 @@ string KuduTableDescriptor::DebugString() const { return out.str(); } +SystemTableDescriptor::SystemTableDescriptor(const TTableDescriptor& tdesc) + : TableDescriptor(tdesc), table_name_(tdesc.systemTable.table_name) {} + +string SystemTableDescriptor::DebugString() const { + stringstream out; + out << "SystemTable(" << TableDescriptor::DebugString() << " table=" << table_name_ + << ")"; + return out.str(); +} + TupleDescriptor::TupleDescriptor(const TTupleDescriptor& tdesc) : id_(tdesc.id), byte_size_(tdesc.byteSize), @@ -597,6 +607,9 @@ Status DescriptorTbl::CreateTblDescriptorInternal(const TTableDescriptor& tdesc, case TTableType::KUDU_TABLE: *desc = pool->Add(new KuduTableDescriptor(tdesc)); break; + case TTableType::SYSTEM_TABLE: + *desc = pool->Add(new SystemTableDescriptor(tdesc)); + break; default: DCHECK(false) << "invalid table type: " << tdesc.tableType; } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index a73420205..dd728525e 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -575,6 +575,16 @@ class KuduTableDescriptor : public TableDescriptor { std::vector<std::string> master_addresses_; }; +// Descriptor for a SystemTable +class SystemTableDescriptor : public TableDescriptor { + public: + SystemTableDescriptor(const TTableDescriptor& tdesc); + virtual std::string DebugString() const; + + private: + TSystemTableName::type table_name_; +}; + class TupleDescriptor { public: int byte_size() const { return byte_size_; } diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc index 7f448e115..6c614fc8e 100644 --- a/be/src/scheduling/admission-controller.cc +++ b/be/src/scheduling/admission-controller.cc @@ -1861,6 +1861,10 @@ Status AdmissionController::ComputeGroupScheduleStates( return Status::OK(); } + // Collect all coordinators if needed for the request. + ExecutorGroup coords = request.request.include_all_coordinators ? + membership_snapshot->GetCoordinators() : ExecutorGroup("all-coordinators"); + // We loop over the executor groups in a deterministic order. If // --balance_queries_across_executor_groups set to true, executor groups with more // available memory and slots will be processed first. If the flag set to false, we will @@ -1890,7 +1894,7 @@ Status AdmissionController::ComputeGroupScheduleStates( const string& group_name = executor_group->name(); VLOG(3) << "Scheduling for executor group: " << group_name << " with " << executor_group->NumExecutors() << " executors"; - const Scheduler::ExecutorConfig group_config = {*executor_group, coord_desc}; + const Scheduler::ExecutorConfig group_config = {*executor_group, coord_desc, coords}; RETURN_IF_ERROR(scheduler_->Schedule(group_config, group_state.get())); DCHECK(!group_state->executor_group().empty()); output_schedules->emplace_back(std::move(group_state), *orig_executor_group); diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h index 6c6841107..f72f5b8a0 100644 --- a/be/src/scheduling/admission-controller.h +++ b/be/src/scheduling/admission-controller.h @@ -948,6 +948,8 @@ class AdmissionController { /// an error if no executor groups are available for scheduling, but will set /// 'queue_node->not_admitted_reason' and leave 'queue_node->group_states' empty in /// that case. + /// If the associated TQueryExecRequest sets include_all_coordinators, an extra group + /// containing all active coordinators will be added to ExecutorConfig for scheduling. Status ComputeGroupScheduleStates( ClusterMembershipMgr::SnapshotPtr membership_snapshot, QueueNode* queue_node); diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc index fd86eead4..625c9e8f0 100644 --- a/be/src/scheduling/cluster-membership-mgr-test.cc +++ b/be/src/scheduling/cluster-membership-mgr-test.cc @@ -792,6 +792,51 @@ TEST(ClusterMembershipMgrUnitTest, PopulateExecutorMembershipRequest) { } } +template <class T> +static bool has(const vector<T>& v, const T& m) { + return find(v.begin(), v.end(), m) != v.end(); +} + +/// Test that we can get a list of coordinators. +TEST_F(ClusterMembershipMgrTest, GetCoordinatorAddresses) { + // Initialize all backends early. Test methods handle state propagation through the + // backends_ list, which must be fully initialized before starting backends. + Backend* coordinator0 = CreateBackend(); + const NetworkAddressPB& addr0 = coordinator0->desc->address(); + CreateCMM(coordinator0); + Backend* coordinator1 = CreateBackend(); + const NetworkAddressPB& addr1 = coordinator1->desc->address(); + coordinator1->desc->set_is_executor(false); + CreateCMM(coordinator1); + Backend* executor = CreateBackend(); + executor->desc->set_is_coordinator(false); + CreateCMM(executor); + + EXPECT_EQ(0, coordinator0->cmm->GetSnapshot()->GetCoordinatorAddresses().size()); + + StartBackend(coordinator0); + vector<TNetworkAddress> orig_coordinators = + coordinator0->cmm->GetSnapshot()->GetCoordinatorAddresses(); + EXPECT_EQ(1, orig_coordinators.size()); + EXPECT_EQ(FromNetworkAddressPB(addr0), orig_coordinators[0]); + + StartBackend(executor); + EXPECT_EQ(orig_coordinators, + coordinator0->cmm->GetSnapshot()->GetCoordinatorAddresses()); + EXPECT_EQ(orig_coordinators, executor->cmm->GetSnapshot()->GetCoordinatorAddresses()); + + StartBackend(coordinator1); + orig_coordinators = coordinator0->cmm->GetSnapshot()->GetCoordinatorAddresses(); + EXPECT_EQ(2, orig_coordinators.size()); + // List of coordinators is unsorted. + EXPECT_TRUE(has(orig_coordinators, FromNetworkAddressPB(addr0))); + EXPECT_TRUE(has(orig_coordinators, FromNetworkAddressPB(addr1))); + + EXPECT_EQ(orig_coordinators, executor->cmm->GetSnapshot()->GetCoordinatorAddresses()); + EXPECT_EQ(orig_coordinators, + coordinator1->cmm->GetSnapshot()->GetCoordinatorAddresses()); +} + /// TODO: Write a test that makes a number of random changes to cluster membership while /// not maintaining the proper lifecycle steps that a backend goes through (create, start, /// quiesce, delete). diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc index 426931a54..615a385f2 100644 --- a/be/src/scheduling/cluster-membership-mgr.cc +++ b/be/src/scheduling/cluster-membership-mgr.cc @@ -161,6 +161,33 @@ ClusterMembershipMgr::SnapshotPtr ClusterMembershipMgr::GetSnapshot() const { return state; } +static bool is_active_coordinator(const BackendDescriptorPB& be) { + return be.has_is_coordinator() && be.is_coordinator() && + !(be.has_is_quiescing() && be.is_quiescing()); +} + +ExecutorGroup ClusterMembershipMgr::Snapshot::GetCoordinators() const { + ExecutorGroup coordinators("all-coordinators"); + for (const auto& it : current_backends) { + if (is_active_coordinator(it.second)) { + coordinators.AddExecutor(it.second); + } + } + return coordinators; +} + +vector<TNetworkAddress> ClusterMembershipMgr::Snapshot::GetCoordinatorAddresses() const { + vector<TNetworkAddress> coordinators; + for (const auto& it : current_backends) { + if (is_active_coordinator(it.second)) { + VLOG_QUERY << "Found coordinator " + << it.second.address().hostname() << ":" << it.second.address().port(); + coordinators.emplace_back(FromNetworkAddressPB(it.second.address())); + } + } + return coordinators; +} + void ClusterMembershipMgr::UpdateMembership( const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas, vector<TTopicDelta>* subscriber_topic_updates) { diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h index ff698f1b5..8c82b9972 100644 --- a/be/src/scheduling/cluster-membership-mgr.h +++ b/be/src/scheduling/cluster-membership-mgr.h @@ -85,6 +85,10 @@ class ClusterMembershipMgr { struct Snapshot { Snapshot() = default; Snapshot(const Snapshot&) = default; + /// Returns an executor group of all non-quiescing coordinators in the cluster. + ExecutorGroup GetCoordinators() const; + /// Returns the addresses of all non-quiescing coordinators in the cluster. + std::vector<TNetworkAddress> GetCoordinatorAddresses() const; /// The current backend descriptor of the local backend. BeDescSharedPtr local_be_desc; /// Map from unique backend ID to BackendDescriptorPB for all known backends, diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc index 97c32e724..580240a8d 100644 --- a/be/src/scheduling/scheduler-test-util.cc +++ b/be/src/scheduling/scheduler-test-util.cc @@ -281,6 +281,23 @@ const TScanRangeSpec& Plan::scan_range_specs() const { return scan_range_specs_; } +void Plan::AddSystemTableScan() { + for (const Host& host : cluster().hosts()) { + // SystemTable host_list uses coordinator addresses. + TNetworkAddress node; + node.hostname = host.ip; + node.port = host.be_port; + int32_t host_idx = referenced_datanodes_.size(); + referenced_datanodes_.push_back(node); + + TScanRangeLocationList scan_range_locations; + scan_range_locations.scan_range.__set_is_system_scan(true); + scan_range_locations.locations.resize(1); + scan_range_locations.locations[0].host_idx = host_idx; + scan_range_specs_.concrete_ranges.push_back(scan_range_locations); + } +} + void Plan::AddTableScan(const TableName& table_name) { const Table& table = schema_.GetTable(table_name); const vector<Block>& blocks = table.blocks; @@ -564,8 +581,13 @@ void Result::ProcessAssignments(const AssignmentCallback& cb) const { per_node_ranges_elem.second; for (const ScanRangeParamsPB& scan_range_params : scan_range_params_vector) { const ScanRangePB& scan_range = scan_range_params.scan_range(); - DCHECK(scan_range.has_hdfs_file_split()); - const HdfsFileSplitPB& hdfs_file_split = scan_range.hdfs_file_split(); + HdfsFileSplitPB hdfs_file_split; + if (scan_range.has_hdfs_file_split()) { + hdfs_file_split = scan_range.hdfs_file_split(); + } else { + DCHECK(scan_range.has_is_system_scan() && scan_range.is_system_scan()); + hdfs_file_split.set_length(0); + } bool try_hdfs_cache = scan_range_params.has_try_hdfs_cache() ? scan_range_params.try_hdfs_cache() : false; bool is_remote = @@ -619,7 +641,8 @@ SchedulerWrapper::SchedulerWrapper(const Plan& plan) InitializeScheduler(); } -Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) { +Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result, + bool include_all_coordinators) { DCHECK(scheduler_ != nullptr); // Compute Assignment. @@ -644,17 +667,29 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) { cluster_membership_mgr_->GetSnapshot(); auto it = membership_snapshot->executor_groups.find( ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME); + + ExecutorGroup coords("all-coordinators"); + if (include_all_coordinators) { + for (const auto& it : membership_snapshot->current_backends) { + if (it.second.has_is_coordinator() && it.second.is_coordinator()) { + coords.AddExecutor(it.second); + LOG(INFO) << "Adding " << it.second.address() << " to all coordinators"; + } + } + LOG(INFO) << "Added " << coords.NumExecutors() << " to all coordinators"; + } + // If a group does not exist (e.g. no executors are registered), we pass an empty group // to the scheduler to exercise its error handling logic. bool no_executor_group = it == membership_snapshot->executor_groups.end(); ExecutorGroup empty_group("empty-group"); DCHECK(membership_snapshot->local_be_desc.get() != nullptr); - Scheduler::ExecutorConfig executor_config = - {no_executor_group ? empty_group : it->second, *membership_snapshot->local_be_desc}; + Scheduler::ExecutorConfig executor_config = {no_executor_group ? empty_group : + it->second, *membership_snapshot->local_be_desc, coords}; std::mt19937 rng(rand()); return scheduler_->ComputeScanRangeAssignment(executor_config, 0, nullptr, false, *locations, plan_.referenced_datanodes(), exec_at_coord, plan_.query_options(), - nullptr, &rng, assignment); + nullptr, &rng, nullptr, assignment); } void SchedulerWrapper::AddBackend(const Host& host) { diff --git a/be/src/scheduling/scheduler-test-util.h b/be/src/scheduling/scheduler-test-util.h index 7696e2c2b..1806d5268 100644 --- a/be/src/scheduling/scheduler-test-util.h +++ b/be/src/scheduling/scheduler-test-util.h @@ -310,6 +310,9 @@ class Plan { const TScanRangeSpec& scan_range_specs() const; + /// Add a scan across all coordinators. + void AddSystemTableScan(); + /// Add a scan of table 'table_name' to the plan. This method will populate the internal /// TScanRangeSpecs and can be called multiple times for the same table to schedule /// additional scans. @@ -513,10 +516,13 @@ class SchedulerWrapper { SchedulerWrapper(const Plan& plan); /// Call ComputeScanRangeAssignment() with exec_at_coord set to false. - Status Compute(Result* result) { return Compute(false, result); } + Status Compute(Result* result, bool include_all_coordinators = false) { + return Compute(false, result, include_all_coordinators); + } /// Call ComputeScanRangeAssignment(). - Status Compute(bool exec_at_coord, Result* result); + Status Compute(bool exec_at_coord, Result* result, + bool include_all_coordinators = false); /// Reset the state of the scheduler by re-creating and initializing it. void Reset() { InitializeScheduler(); } diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc index b1ee871df..d29f4e7f7 100644 --- a/be/src/scheduling/scheduler-test.cc +++ b/be/src/scheduling/scheduler-test.cc @@ -111,6 +111,25 @@ TEST_F(SchedulerTest, ExecAtCoord) { EXPECT_EQ(0, result.NumTotalAssignedBytes(2)); } +/// Test cluster configuration with one coordinator that can't process scan ranges +/// when scheduling to coordinators is enabled. +TEST_F(SchedulerTest, UseDedicatedCoordinator) { + Cluster cluster; + cluster.AddHost(true, true, false); + cluster.AddHost(true, true, true); + cluster.AddHost(true, true, true); + + Schema schema(cluster); + Plan plan(schema); + plan.AddSystemTableScan(); + + Result result(plan); + SchedulerWrapper scheduler(plan); + ASSERT_OK(scheduler.Compute(&result, true)); + + EXPECT_EQ(3, result.NumDistinctBackends()); +} + /// Test scanning a simple table twice. TEST_F(SchedulerTest, ScanTableTwice) { Cluster cluster; diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 287407389..392f416fc 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -65,7 +65,8 @@ static const string SCHEDULER_WARNING_KEY("Scheduler Warning"); static const vector<TPlanNodeType::type> SCAN_NODE_TYPES{TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE, TPlanNodeType::DATA_SOURCE_NODE, - TPlanNodeType::KUDU_SCAN_NODE, TPlanNodeType::ICEBERG_METADATA_SCAN_NODE}; + TPlanNodeType::KUDU_SCAN_NODE, TPlanNodeType::ICEBERG_METADATA_SCAN_NODE, + TPlanNodeType::SYSTEM_TABLE_SCAN_NODE}; // Consistent scheduling requires picking up to k distinct candidates out of n nodes. // Since each iteration can pick a node that it already picked (i.e. it is sampling with @@ -92,11 +93,18 @@ const BackendDescriptorPB& Scheduler::LookUpBackendDesc( const ExecutorConfig& executor_config, const NetworkAddressPB& host) { const BackendDescriptorPB* desc = executor_config.group.LookUpBackendDesc(host); if (desc == nullptr) { - // Coordinator host may not be in executor_config's executor group if it's a dedicated - // coordinator, or if it is configured to be in a different executor group. - const BackendDescriptorPB& coord_desc = executor_config.coord_desc; - DCHECK(host == coord_desc.address()); - desc = &coord_desc; + if (executor_config.all_coords.NumExecutors() > 0) { + // Group containing all coordinators was provided, so use that for lookup. + desc = executor_config.all_coords.LookUpBackendDesc(host); + DCHECK_NE(nullptr, desc); + } else { + // Coordinator host may not be in executor_config's executor group if it's a + // dedicated coordinator, or if it is configured to be in a different executor + // group. + const BackendDescriptorPB& coord_desc = executor_config.coord_desc; + DCHECK(host == coord_desc.address()); + desc = &coord_desc; + } } return *desc; } @@ -212,7 +220,8 @@ Status Scheduler::ComputeScanRangeAssignment( RETURN_IF_ERROR( ComputeScanRangeAssignment(executor_config, node_id, node_replica_preference, node_random_replica, *locations, exec_request.host_list, exec_at_coord, - state->query_options(), total_assignment_timer, state->rng(), assignment)); + state->query_options(), total_assignment_timer, state->rng(), + state->summary_profile(), assignment)); state->IncNumScanRanges(locations->size()); } } @@ -376,6 +385,13 @@ void Scheduler::ComputeRandomKrpcForAggregation(const ExecutorConfig& executor_c } } +static inline void initializeSchedulerWarning(RuntimeProfile* summary_profile) { + if (summary_profile->GetInfoString(SCHEDULER_WARNING_KEY) == nullptr) { + summary_profile->AddInfoString(SCHEDULER_WARNING_KEY, + "Cluster membership might changed between planning and scheduling"); + } +} + Status Scheduler::CheckEffectiveInstanceCount( const FragmentScheduleState* fragment_state, ScheduleState* state) { // These checks are only intended if COMPUTE_PROCESSING_COST=true. @@ -383,11 +399,7 @@ Status Scheduler::CheckEffectiveInstanceCount( int effective_instance_count = fragment_state->fragment.effective_instance_count; if (effective_instance_count < fragment_state->instance_states.size()) { - if (state->summary_profile()->GetInfoString(SCHEDULER_WARNING_KEY) == nullptr) { - state->summary_profile()->AddInfoString(SCHEDULER_WARNING_KEY, - "Cluster membership might changed between planning and scheduling"); - } - + initializeSchedulerWarning(state->summary_profile()); string warn_message = Substitute( "$0 scheduled instance count ($1) is higher than its effective count ($2)", fragment_state->fragment.display_name, fragment_state->instance_states.size(), @@ -815,7 +827,7 @@ Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_conf bool node_random_replica, const vector<TScanRangeLocationList>& locations, const vector<TNetworkAddress>& host_list, bool exec_at_coord, const TQueryOptions& query_options, RuntimeProfile::Counter* timer, std::mt19937* rng, - FragmentScanRangeAssignment* assignment) { + RuntimeProfile* summary_profile, FragmentScanRangeAssignment* assignment) { const ExecutorGroup& executor_group = executor_config.group; if (executor_group.NumExecutors() == 0 && !exec_at_coord) { return Status(TErrorCode::NO_REGISTERED_BACKENDS); @@ -864,6 +876,28 @@ Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_conf coord_desc.address().hostname(), nullptr)); assignment_ctx.RecordScanRangeAssignment( coord_desc, node_id, host_list, scan_range_locations, assignment); + } else if (scan_range_locations.scan_range.__isset.is_system_scan && + scan_range_locations.scan_range.is_system_scan) { + // Must run on a coordinator, lookup in executor_config.all_coords + DCHECK_EQ(1, scan_range_locations.locations.size()); + const TNetworkAddress& coordinator = + host_list[scan_range_locations.locations[0].host_idx]; + const BackendDescriptorPB* coordinatorPB = + executor_config.all_coords.LookUpBackendDesc(FromTNetworkAddress(coordinator)); + if (coordinatorPB == nullptr) { + // Coordinator is no longer available, skip this range. + string warn_message = Substitute( + "Coordinator $0 is no longer available for system table scan assignment", + TNetworkAddressToString(coordinator)); + if (LIKELY(summary_profile != nullptr)) { + initializeSchedulerWarning(summary_profile); + summary_profile->AppendInfoString(SCHEDULER_WARNING_KEY, warn_message); + } + LOG(WARNING) << warn_message; + continue; + } + assignment_ctx.RecordScanRangeAssignment( + *coordinatorPB, node_id, host_list, scan_range_locations, assignment); } else { // Collect executor candidates with smallest memory distance. vector<IpAddr> executor_candidates; @@ -1387,6 +1421,9 @@ void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_ra if (tscan_range.__isset.file_metadata) { scan_range_pb->set_file_metadata(tscan_range.file_metadata); } + if (tscan_range.__isset.is_system_scan) { + scan_range_pb->set_is_system_scan(tscan_range.is_system_scan); + } } void Scheduler::AssignmentCtx::RecordScanRangeAssignment( @@ -1394,6 +1431,23 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment( const vector<TNetworkAddress>& host_list, const TScanRangeLocationList& scan_range_locations, FragmentScanRangeAssignment* assignment) { + if (scan_range_locations.scan_range.__isset.is_system_scan && + scan_range_locations.scan_range.is_system_scan) { + // Assigned to a coordinator. + PerNodeScanRanges* scan_ranges = + FindOrInsert(assignment, executor.address(), PerNodeScanRanges()); + vector<ScanRangeParamsPB>* scan_range_params_list = + FindOrInsert(scan_ranges, node_id, vector<ScanRangeParamsPB>()); + ScanRangeParamsPB scan_range_params; + TScanRangeToScanRangePB( + scan_range_locations.scan_range, scan_range_params.mutable_scan_range()); + scan_range_params_list->push_back(scan_range_params); + + VLOG_FILE << "Scheduler assignment of system table scan to coordinator: " + << executor.address(); + return; + } + int64_t scan_range_length = 0; if (scan_range_locations.scan_range.__isset.hdfs_file_split) { scan_range_length = scan_range_locations.scan_range.hdfs_file_split.length; diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index 7e116e503..7ed3baeee 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -66,6 +66,7 @@ class Scheduler { struct ExecutorConfig { const ExecutorGroup& group; const BackendDescriptorPB& coord_desc; + const ExecutorGroup& all_coords; }; /// Populates given query schedule and assigns fragments to hosts based on scan @@ -351,13 +352,15 @@ class Scheduler { /// query_options: Query options for the current query. /// timer: Tracks execution time of ComputeScanRangeAssignment. /// rng: Random number generated used for any random decisions + /// summary_profile: Summary profile for any scheduler warnings. /// assignment: Output parameter, to which new assignments will be added. Status ComputeScanRangeAssignment(const ExecutorConfig& executor_config, PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference, bool node_random_replica, const std::vector<TScanRangeLocationList>& locations, const std::vector<TNetworkAddress>& host_list, bool exec_at_coord, const TQueryOptions& query_options, RuntimeProfile::Counter* timer, - std::mt19937* rng, FragmentScanRangeAssignment* assignment); + std::mt19937* rng, RuntimeProfile* summary_profile, + FragmentScanRangeAssignment* assignment); /// Computes execution parameters for all backends assigned in the query and always one /// for the coordinator backend since it participates in execution regardless. Must be diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc index f4bee52e6..3182f320c 100644 --- a/be/src/service/fe-support.cc +++ b/be/src/service/fe-support.cc @@ -43,6 +43,7 @@ #include "runtime/mem-pool.h" #include "runtime/raw-value.h" #include "runtime/runtime-state.h" +#include "scheduling/cluster-membership-mgr.h" #include "service/impala-server.h" #include "service/query-options.h" #include "util/bloom-filter.h" @@ -643,6 +644,35 @@ Java_org_apache_impala_service_FeSupport_NativeParseQueryOptions( return result_bytes; } +// Get a list of known coordinators. +extern "C" JNIEXPORT jbyteArray JNICALL +Java_org_apache_impala_service_FeSupport_NativeGetCoordinators( + JNIEnv* env, jclass caller_class) { + ClusterMembershipMgr::SnapshotPtr membership_snapshot = + ExecEnv::GetInstance()->cluster_membership_mgr()->GetSnapshot(); + DCHECK(membership_snapshot != nullptr); + vector<TNetworkAddress> coordinators = membership_snapshot->GetCoordinatorAddresses(); + + TAddressesList addresses_container; + addresses_container.__set_addresses(coordinators); + jbyteArray result_bytes = nullptr; + THROW_IF_ERROR_RET(SerializeThriftMsg(env, &addresses_container, &result_bytes), env, + JniUtil::internal_exc_class(), result_bytes); + return result_bytes; +} + +// Get the number of live queries. +extern "C" JNIEXPORT jlong JNICALL +Java_org_apache_impala_service_FeSupport_NativeNumLiveQueries( + JNIEnv* env, jclass caller_class) { + ImpalaServer* server = ExecEnv::GetInstance()->impala_server(); + if (LIKELY(server != nullptr)) { + return server->NumLiveQueries(); + } + // Allow calling without an ImpalaServer, such as during PlannerTest. + return 0; +} + // Returns the log (base 2) of the minimum number of bytes we need for a Bloom filter // with 'ndv' unique elements and a false positive probability of less than 'fpp'. extern "C" @@ -818,6 +848,14 @@ static JNINativeMethod native_methods[] = { const_cast<char*>("NativeGetLatestCompactions"), const_cast<char*>("([B)[B"), (void*) ::Java_org_apache_impala_service_FeSupport_NativeGetLatestCompactions }, + { + const_cast<char*>("NativeGetCoordinators"), const_cast<char*>("()[B"), + (void*)::Java_org_apache_impala_service_FeSupport_NativeGetCoordinators + }, + { + const_cast<char*>("NativeNumLiveQueries"), const_cast<char*>("()J"), + (void*)::Java_org_apache_impala_service_FeSupport_NativeNumLiveQueries + }, }; void InitFeSupport(bool disable_codegen) { diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h index 363d2d519..e7aa7204e 100644 --- a/be/src/service/frontend.h +++ b/be/src/service/frontend.h @@ -248,7 +248,7 @@ class Frontend { jmethodID get_hadoop_groups_id_; // JniFrontend.getHadoopGroups() jmethodID check_config_id_; // JniFrontend.checkConfiguration() jmethodID update_catalog_cache_id_; // JniFrontend.updateCatalogCache(byte[][]) - jmethodID update_membership_id_; // JniFrontend.updateMembership() + jmethodID update_membership_id_; // JniFrontend.updateExecutorMembership() jmethodID get_catalog_metrics_id_; // JniFrontend.getCatalogMetrics() jmethodID get_table_names_id_; // JniFrontend.getTableNames jmethodID describe_db_id_; // JniFrontend.describeDb diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index 243c52843..efe0d6705 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1562,6 +1562,7 @@ void ImpalaServer::FinishUnregisterQuery(const QueryHandle& query_handle) { // Do all the finalization before removing the QueryDriver from the map so that // concurrent operations, e.g. GetRuntimeProfile() can find the query. CloseClientRequestState(query_handle); + DebugActionNoFail(query_handle->query_options(), "CLOSED_NOT_UNREGISTERED"); // Make the QueryDriver inaccessible. There is a time window where the query is // both in 'query_driver_map_' and 'query_locations_'. Status status = query_handle.query_driver()->Unregister(&query_driver_map_); diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h index 332b4725a..9cebfe0dc 100644 --- a/be/src/service/impala-server.h +++ b/be/src/service/impala-server.h @@ -81,6 +81,7 @@ class TGetExecSummaryReq; class ClientRequestState; class QueryDriver; struct QueryHandle; +class QueryScanner; class SimpleLogger; class UpdateFilterParamsPB; class UpdateFilterResultPB; @@ -504,6 +505,11 @@ class ImpalaServer : public ImpalaServiceIf, /// the server has started successfully. int GetHS2Port(); + /// Return the number of live queries managed by this server. Acquires + /// completed_queries_lock_ to check for completed queries that have not been written. + /// (implemented in workload-management.cc) + size_t NumLiveQueries(); + /// Returns a current snapshot of the local backend descriptor. std::shared_ptr<const BackendDescriptorPB> GetLocalBackendDescriptor(); @@ -701,6 +707,7 @@ class ImpalaServer : public ImpalaServiceIf, friend struct SessionState; friend class ImpalaServerTest; friend class QueryDriver; + friend class QueryScanner; static const string BEESWAX_SERVER_NAME; static const string HS2_SERVER_NAME; @@ -1130,6 +1137,11 @@ class ImpalaServer : public ImpalaServiceIf, /// (implemented in workload-management.cc) void CompletedQueriesThread(); + /// Returns a list of completed queries that have not yet been written to storage. + /// Acquires completed_queries_lock_ to make a copy of completed_queries_ state. + /// (implemented in workload-management.cc) + std::vector<std::shared_ptr<QueryStateExpanded>> GetCompletedQueries(); + /// Called from ExpireQueries() to check query resource limits for 'crs'. If the query /// exceeded a resource limit, returns a non-OK status with information about what /// limit was exceeded. Returns OK if the query will continue running and expiration diff --git a/be/src/service/query-state-record.h b/be/src/service/query-state-record.h index fec77f22c..c3fec5518 100644 --- a/be/src/service/query-state-record.h +++ b/be/src/service/query-state-record.h @@ -343,7 +343,7 @@ struct QueryStateExpanded { /// Required data will be copied from the provided ClientRequestState into members of /// the struct. QueryStateExpanded(const ClientRequestState& exec_state, - const std::shared_ptr<QueryStateRecord> base_state_src); + const std::shared_ptr<QueryStateRecord> base_state_src = nullptr); }; // struct QueryStateExpanded } // namespace impala diff --git a/be/src/service/workload-management-fields.cc b/be/src/service/workload-management-fields.cc index 5f52d2bcf..19ef98fb7 100644 --- a/be/src/service/workload-management-fields.cc +++ b/be/src/service/workload-management-fields.cc @@ -64,6 +64,7 @@ static void _write_event(FieldParserContext& ctx, QueryEvent target_event) { ctx.sql << event->second; } +/// List of query table columns. Must be kept in-sync with SystemTables.thrift const std::list<FieldDefinition> FIELD_DEFINITIONS = { // Cluster Id // Required diff --git a/be/src/service/workload-management.cc b/be/src/service/workload-management.cc index 8545eb3e8..6394c508d 100644 --- a/be/src/service/workload-management.cc +++ b/be/src/service/workload-management.cc @@ -166,6 +166,12 @@ static const string QueryStateToSql(const QueryStateExpanded* rec) noexcept { return sql.str(); } // function QueryStateToSql +size_t ImpalaServer::NumLiveQueries() { + size_t live_queries = query_driver_map_.Count(); + std::lock_guard<std::mutex> l(completed_queries_lock_); + return live_queries + completed_queries_.size(); +} + Status ImpalaServer::InitWorkloadManagement() { if (FLAGS_enable_workload_mgmt) { return Thread::Create("impala-server", "completed-queries", @@ -415,4 +421,14 @@ void ImpalaServer::CompletedQueriesThread() { } } // ImpalaServer::CompletedQueriesThread +vector<shared_ptr<QueryStateExpanded>> ImpalaServer::GetCompletedQueries() { + lock_guard<mutex> l(completed_queries_lock_); + vector<shared_ptr<QueryStateExpanded>> results; + results.reserve(completed_queries_.size()); + for (const auto& r : completed_queries_) { + results.emplace_back(r.query); + } + return results; +} + } // namespace impala diff --git a/be/src/util/sharded-query-map-util.h b/be/src/util/sharded-query-map-util.h index 4e2836f5d..217090dc4 100644 --- a/be/src/util/sharded-query-map-util.h +++ b/be/src/util/sharded-query-map-util.h @@ -55,6 +55,16 @@ class GenericShardedQueryMap { } } + // Return number of elements in the sharded query map. + size_t Count() { + size_t count = 0; + for (int i = 0; i < NUM_QUERY_BUCKETS; ++i) { + std::lock_guard<SpinLock> l(shards_[i].map_lock_); + count += shards_[i].map_.size(); + } + return count; + } + // Adds ('key', 'value') to the map, returning an error if 'key' already exists. Status Add(const K& key, const V& value); diff --git a/common/protobuf/planner.proto b/common/protobuf/planner.proto index 26b4cd2fc..7807c12f1 100644 --- a/common/protobuf/planner.proto +++ b/common/protobuf/planner.proto @@ -81,4 +81,5 @@ message ScanRangePB { optional HBaseKeyRangePB hbase_key_range = 2; optional bytes kudu_scan_token = 3; optional bytes file_metadata = 4; + optional bool is_system_scan = 5; } diff --git a/common/thrift/CMakeLists.txt b/common/thrift/CMakeLists.txt index 23fc1fac3..f86036606 100644 --- a/common/thrift/CMakeLists.txt +++ b/common/thrift/CMakeLists.txt @@ -204,6 +204,7 @@ set (SRC_FILES RuntimeProfile.thrift SqlConstraints.thrift StatestoreService.thrift + SystemTables.thrift Zip.thrift ${TCLI_SERVICE_THRIFT} ${EXT_DATA_SRC_FILES} diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 3759c8a5a..37297ae3b 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -62,6 +62,8 @@ enum TTableType { // so that a materialized view will not be classified as a table. Refer to // IncompleteTable#toThrift() for further details. MATERIALIZED_VIEW = 7 + // Represents a system table reflecting backend internal state. + SYSTEM_TABLE = 8 } // TODO: Separate the storage engines (e.g. Kudu) from the file formats. @@ -667,6 +669,17 @@ struct TIcebergTable { 10: optional map<string, TIcebergPartitionStats> partition_stats; } +// Describes the purpose of a particular system table. +// Table names can be found in SystemTable.java +enum TSystemTableName { + QUERY_LIVE = 0 +} + +// Represents a System Table +struct TSystemTable { + 1: required TSystemTableName table_name +} + // Represents a table or view. struct TTable { // Name of the parent database. Case insensitive, expected to be stored as lowercase. @@ -725,6 +738,9 @@ struct TTable { // Comment of the table/view. Set only for FeIncompleteTable where msTable doesn't // exists. 18: optional string tbl_comment + + // Set if this is a system table + 19: optional TSystemTable system_table } // Represents a database. diff --git a/common/thrift/Descriptors.thrift b/common/thrift/Descriptors.thrift index d44aab551..2acc31264 100644 --- a/common/thrift/Descriptors.thrift +++ b/common/thrift/Descriptors.thrift @@ -78,6 +78,7 @@ struct TTableDescriptor { 9: optional CatalogObjects.TDataSourceTable dataSourceTable 10: optional CatalogObjects.TKuduTable kuduTable 11: optional CatalogObjects.TIcebergTable icebergTable + 12: optional CatalogObjects.TSystemTable systemTable // Unqualified name of table 7: required string tableName diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 329fba709..fa798c721 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -54,6 +54,7 @@ enum TPlanNodeType { ICEBERG_DELETE_NODE = 18 ICEBERG_METADATA_SCAN_NODE = 19 TUPLE_CACHE_NODE = 20 + SYSTEM_TABLE_SCAN_NODE = 21 } // phases of an execution node @@ -286,6 +287,7 @@ struct TScanRange { 2: optional THBaseKeyRange hbase_key_range 3: optional binary kudu_scan_token 4: optional binary file_metadata + 5: optional bool is_system_scan } // Specification of an overlap predicate desc. @@ -397,6 +399,11 @@ struct TKuduScanNode { 3: optional i32 count_star_slot_offset } +struct TSystemTableScanNode { + 1: required Types.TTupleId tuple_id + 2: required CatalogObjects.TSystemTableName table_name +} + struct TEqJoinCondition { // left-hand side of "<a> = <b>" 1: required Exprs.TExpr left; @@ -770,6 +777,8 @@ struct TPlanNode { 27: optional TCardinalityCheckNode cardinality_check_node 28: optional TTupleCacheNode tuple_cache_node + + 29: optional TSystemTableScanNode system_table_scan_node } // A flattened representation of a tree of PlanNodes, obtained by depth-first diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index e1f64f6a9..75043a489 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -999,5 +999,8 @@ struct TQueryExecRequest { // Estimated per-host memory. The planner generates this value which may or may not be // overridden to come up with a final per-host memory estimate. 15: optional i64 planner_per_host_mem_estimate; + + // Used for system tables that need to run on all nodes. + 16: optional bool include_all_coordinators } diff --git a/common/thrift/SystemTables.thrift b/common/thrift/SystemTables.thrift new file mode 100644 index 000000000..224fcca41 --- /dev/null +++ b/common/thrift/SystemTables.thrift @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace cpp impala +namespace java org.apache.impala.thrift + +# Must be kept in-sync with workload-management-fields.cc +enum TQueryTableColumn { + CLUSTER_ID + QUERY_ID + SESSION_ID + SESSION_TYPE + HIVESERVER2_PROTOCOL_VERSION + DB_USER + DB_USER_CONNECTION + DB_NAME + IMPALA_COORDINATOR + QUERY_STATUS + QUERY_STATE + IMPALA_QUERY_END_STATE + QUERY_TYPE + NETWORK_ADDRESS + START_TIME_UTC + TOTAL_TIME_NS + QUERY_OPTS_CONFIG + RESOURCE_POOL + PER_HOST_MEM_ESTIMATE + DEDICATED_COORD_MEM_ESTIMATE + PER_HOST_FRAGMENT_INSTANCES + BACKENDS_COUNT + ADMISSION_RESULT + CLUSTER_MEMORY_ADMITTED + EXECUTOR_GROUP + EXECUTOR_GROUPS + EXEC_SUMMARY + NUM_ROWS_FETCHED + ROW_MATERIALIZATION_ROWS_PER_SEC + ROW_MATERIALIZATION_TIME_NS + COMPRESSED_BYTES_SPILLED + EVENT_PLANNING_FINISHED + EVENT_SUBMIT_FOR_ADMISSION + EVENT_COMPLETED_ADMISSION + EVENT_ALL_BACKENDS_STARTED + EVENT_ROWS_AVAILABLE + EVENT_FIRST_ROW_FETCHED + EVENT_LAST_ROW_FETCHED + EVENT_UNREGISTER_QUERY + READ_IO_WAIT_TOTAL_NS + READ_IO_WAIT_MEAN_NS + BYTES_READ_CACHE_TOTAL + BYTES_READ_TOTAL + PERNODE_PEAK_MEM_MIN + PERNODE_PEAK_MEM_MAX + PERNODE_PEAK_MEM_MEAN + SQL + PLAN +} diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift index ee0803f5d..63c7b04a0 100644 --- a/common/thrift/Types.thrift +++ b/common/thrift/Types.thrift @@ -161,6 +161,11 @@ struct TNetworkAddress { 3: optional string uds_address } +// A list of network addresses +struct TAddressesList { + 1: required list<TNetworkAddress> addresses; +} + // Wire format for UniqueId struct TUniqueId { 1: required i64 hi diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 8c2194728..cd19f4f6f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -68,6 +68,7 @@ import org.apache.impala.catalog.MaterializedViewHdfsTable; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.StructField; import org.apache.impala.catalog.StructType; +import org.apache.impala.catalog.SystemTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.TypeCompatibility; @@ -459,6 +460,9 @@ public class Analyzer { // select item from complextypestbl.int_array; public boolean hasTopLevelAcidCollectionTableRef = false; + // True when a SystemTableScanNode is present. + public boolean includeAllCoordinatorsInScheduling = false; + // all registered conjuncts (map from expr id to conjunct). We use a LinkedHashMap to // preserve the order in which conjuncts are added. public final Map<ExprId, Expr> conjuncts = new LinkedHashMap<>(); @@ -692,6 +696,14 @@ public class Analyzer { } } + public boolean includeAllCoordinatorsInScheduling() { + return globalState_.includeAllCoordinatorsInScheduling; + } + + public void setIncludeAllCoordinatorsInScheduling(boolean flag) { + globalState_.includeAllCoordinatorsInScheduling = flag; + } + // An analyzer stores analysis state for a single select block. A select block can be // a top level select statement, or an inline view select block. // ancestors contains the Analyzers of the enclosing select blocks of 'this' @@ -974,7 +986,8 @@ public class Analyzer { Preconditions.checkState(table instanceof FeFsTable || table instanceof FeKuduTable || table instanceof FeHBaseTable || - table instanceof FeDataSourceTable); + table instanceof FeDataSourceTable || + table instanceof SystemTable); return new BaseTableRef(tableRef, resolvedPath); } else { return new CollectionTableRef(tableRef, resolvedPath, false); diff --git a/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java index 37ac060bf..8e6442f39 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/DescribeTableStmt.java @@ -24,6 +24,7 @@ import org.apache.impala.analysis.Path.PathType; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.StructType; +import org.apache.impala.catalog.SystemTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.AnalysisException; @@ -138,6 +139,7 @@ public class DescribeTableStmt extends StatementBase { analyzer.getTable(table_.getTableName(), /* add column-level privilege */ true, Privilege.ANY); checkMinimalForIcebergMetadataTable(); + checkMinimalForSystemTable(); if (!targetsTable()) analyzeComplexType(analyzer); } @@ -174,6 +176,13 @@ public class DescribeTableStmt extends StatementBase { } } + private void checkMinimalForSystemTable() throws AnalysisException { + if (table_ instanceof SystemTable && outputStyle_ != TDescribeOutputStyle.MINIMAL) { + throw new AnalysisException( + "DESCRIBE FORMATTED|EXTENDED cannot refer to a system table."); + } + } + public TDescribeTableParams toThrift() { TDescribeTableParams params = new TDescribeTableParams(); params.setOutput_style(outputStyle_); diff --git a/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java index 32ebceffa..87e4c95e3 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ShowCreateTableStmt.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.FeView; +import org.apache.impala.catalog.SystemTable; import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TTableName; @@ -76,6 +77,8 @@ public class ShowCreateTableStmt extends StatementBase { // statement references a column by its implicitly defined column names. viewAnalyzer.setUseHiveColLabels(true); viewQuery.analyze(viewAnalyzer); + } else if (table instanceof SystemTable) { + throw new AnalysisException("Not supported on system tables."); } } diff --git a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java index 6badb909b..d4ea0df53 100644 --- a/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java +++ b/fe/src/main/java/org/apache/impala/authorization/BaseAuthorizationChecker.java @@ -304,6 +304,7 @@ public abstract class BaseAuthorizationChecker implements AuthorizationChecker { * Throws an AuthorizationException if the dbName is a system db * and the user is trying to modify it. * Returns true if this is a system db and the action is allowed. + * Return false if authorization should be checked in the usual way. */ private boolean checkSystemDbAccess(FeCatalog catalog, String dbName, Privilege privilege) @@ -314,6 +315,9 @@ public abstract class BaseAuthorizationChecker implements AuthorizationChecker { case VIEW_METADATA: case ANY: return true; + case SELECT: + // Check authorization for SELECT on system tables in the usual way. + return false; default: throw new AuthorizationException("Cannot modify system database."); } diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index b0f9d5181..689bb233b 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -381,7 +381,8 @@ public class CatalogServiceCatalog extends Catalog { .getBackendCfg().topic_update_tbl_max_wait_time_ms; Preconditions.checkState(topicUpdateTblLockMaxWaitTimeMs_ >= 0, "topic_update_tbl_max_wait_time_ms must be positive"); - impalaSysTables = Arrays.asList(BackendConfig.INSTANCE.queryLogTableName()); + impalaSysTables = Arrays.asList( + BackendConfig.INSTANCE.queryLogTableName(), SystemTable.QUERY_LIVE); tableLoadingMgr_ = new TableLoadingMgr(this, numLoadingThreads); loadInBackground_ = loadInBackground; try { @@ -440,7 +441,7 @@ public class CatalogServiceCatalog extends Catalog { */ public boolean isBlacklistedDb(String dbName) { Preconditions.checkNotNull(dbName); - if (BackendConfig.INSTANCE.enableWorkloadMgmt() && dbName.equalsIgnoreCase("sys")) { + if (BackendConfig.INSTANCE.enableWorkloadMgmt() && dbName.equalsIgnoreCase(Db.SYS)) { // Override 'sys' for Impala system tables. return false; } @@ -452,7 +453,7 @@ public class CatalogServiceCatalog extends Catalog { */ public boolean isBlacklistedTable(TableName table) { Preconditions.checkNotNull(table); - if (table.getDb().equalsIgnoreCase("sys") && blacklistedDbs_.contains("sys")) { + if (table.getDb().equalsIgnoreCase(Db.SYS) && blacklistedDbs_.contains(Db.SYS)) { // If we've overridden the database blacklist, only allow Impala system tables. return !impalaSysTables.contains(table.getTbl()); } diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java index 919a71e3f..189edaff8 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Db.java +++ b/fe/src/main/java/org/apache/impala/catalog/Db.java @@ -35,6 +35,7 @@ import org.apache.impala.analysis.KuduPartitionParam; import org.apache.impala.catalog.events.InFlightEvents; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; +import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TBriefTableMeta; import org.apache.impala.thrift.TCatalogObject; import org.apache.impala.thrift.TCatalogObjectType; @@ -82,6 +83,9 @@ public class Db extends CatalogObjectImpl implements FeDb { public static final String FUNCTION_INDEX_PREFIX = "impala_registered_function_"; + // Name of the standard system DB. Also used by Hive MetaStore. + public static final String SYS = "sys"; + // Hive metastore imposes a limit of 4000 bytes on the key and value strings // in DB parameters map. We need ensure that this limit isn't crossed // while serializing functions to the metastore. @@ -129,6 +133,13 @@ public class Db extends CatalogObjectImpl implements FeDb { setMetastoreDb(name, msDb); tableCache_ = new CatalogObjectCache<>(); functions_ = new HashMap<>(); + + // This constructor is called from a static initializer in tests. + if (BackendConfig.INSTANCE != null && BackendConfig.INSTANCE.enableWorkloadMgmt() && + name.equalsIgnoreCase(SYS)) { + // Add built-in tables. + addTable(SystemTable.CreateQueryLiveTable(this, getOwnerUser())); + } } public long getCreateEventId() { return createEventId_; } diff --git a/fe/src/main/java/org/apache/impala/catalog/SystemTable.java b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java new file mode 100644 index 000000000..19129a508 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/catalog/SystemTable.java @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog; + +import static org.apache.impala.analysis.Analyzer.ACCESSTYPE_READ; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.impala.common.InternalException; +import org.apache.impala.compat.MetastoreShim; +import org.apache.impala.service.FeSupport; +import org.apache.impala.thrift.TCatalogObjectType; +import org.apache.impala.thrift.TColumn; +import org.apache.impala.thrift.TQueryTableColumn; +import org.apache.impala.thrift.TResultSet; +import org.apache.impala.thrift.TResultSetMetadata; +import org.apache.impala.thrift.TSystemTable; +import org.apache.impala.thrift.TSystemTableName; +import org.apache.impala.thrift.TTable; +import org.apache.impala.thrift.TTableDescriptor; +import org.apache.impala.thrift.TTableType; +import org.apache.impala.util.TResultRowBuilder; +import com.google.common.collect.ImmutableMap; +import com.google.common.base.Preconditions; + +/** + * Represents a system table reflecting backend internal state. + */ +public final class SystemTable extends Table { + public static final String QUERY_LIVE = "impala_query_live"; + private static final Map<String, TSystemTableName> SYSTEM_TABLE_NAME_MAP = + ImmutableMap.of(QUERY_LIVE, TSystemTableName.QUERY_LIVE); + + private SystemTable(org.apache.hadoop.hive.metastore.api.Table msTable, Db db, + String name, String owner) { + super(msTable, db, name, owner); + } + + // Get Type for a TQueryTableColumn + private static Type getColumnType(TQueryTableColumn column) { + switch (column) { + case START_TIME_UTC: + return Type.TIMESTAMP; + case TOTAL_TIME_NS: + case PER_HOST_MEM_ESTIMATE: + case DEDICATED_COORD_MEM_ESTIMATE: + case CLUSTER_MEMORY_ADMITTED: + case NUM_ROWS_FETCHED: + case ROW_MATERIALIZATION_ROWS_PER_SEC: + case ROW_MATERIALIZATION_TIME_NS: + case COMPRESSED_BYTES_SPILLED: + case EVENT_PLANNING_FINISHED: + case EVENT_SUBMIT_FOR_ADMISSION: + case EVENT_COMPLETED_ADMISSION: + case EVENT_ALL_BACKENDS_STARTED: + case EVENT_ROWS_AVAILABLE: + case EVENT_FIRST_ROW_FETCHED: + case EVENT_LAST_ROW_FETCHED: + case EVENT_UNREGISTER_QUERY: + case READ_IO_WAIT_TOTAL_NS: + case READ_IO_WAIT_MEAN_NS: + case BYTES_READ_CACHE_TOTAL: + case BYTES_READ_TOTAL: + case PERNODE_PEAK_MEM_MIN: + case PERNODE_PEAK_MEM_MAX: + case PERNODE_PEAK_MEM_MEAN: + return Type.BIGINT; + case BACKENDS_COUNT: + return Type.INT; + default: + return Type.STRING; + } + } + + public static SystemTable CreateQueryLiveTable(Db db, String owner) { + List<FieldSchema> fsList = new ArrayList<FieldSchema>(); + for (TQueryTableColumn column : TQueryTableColumn.values()) { + // The type string must be lowercase for Hive to read the column metadata properly. + String typeSql = getColumnType(column).toSql().toLowerCase(); + FieldSchema fs = new FieldSchema(column.name().toLowerCase(), typeSql, ""); + fsList.add(fs); + } + org.apache.hadoop.hive.metastore.api.Table msTable = + createMetastoreTable(db.getName(), QUERY_LIVE, owner, fsList); + + SystemTable table = new SystemTable(msTable, db, QUERY_LIVE, owner); + for (TQueryTableColumn column : TQueryTableColumn.values()) { + table.addColumn(new Column( + column.name().toLowerCase(), getColumnType(column), column.ordinal())); + } + return table; + } + + public TSystemTableName getSystemTableName() { + return SYSTEM_TABLE_NAME_MAP.get(getName()); + } + + @Override + public TTableDescriptor toThriftDescriptor(int tableId, + Set<Long> referencedPartitions) { + // Create thrift descriptors to send to the BE. + TTableDescriptor tableDescriptor = + new TTableDescriptor(tableId, TTableType.SYSTEM_TABLE, getTColumnDescriptors(), + numClusteringCols_, name_, db_.getName()); + tableDescriptor.setSystemTable(getTSystemTable()); + return tableDescriptor; + } + + @Override + public long getNumRows() { + try { + // Return an estimate of the number of live queries assuming balanced load across + // coordinators. + return FeSupport.NumLiveQueries() * FeSupport.GetCoordinators().getAddressesSize(); + } catch (InternalException e) { + return super.getNumRows(); + } + } + + /** + * Returns a thrift structure for the system table. + */ + private TSystemTable getTSystemTable() { + return new TSystemTable(getSystemTableName()); + } + + @Override + public TCatalogObjectType getCatalogObjectType() { + return TCatalogObjectType.TABLE; + } + + @Override + public void load(boolean reuseMetadata, IMetaStoreClient client, + org.apache.hadoop.hive.metastore.api.Table msTbl, String reason) + throws TableLoadingException { + // Table is always loaded. + Preconditions.checkState(false); + } + + /** + * Returns a thrift structure representing the table. + */ + @Override + public TTable toThrift() { + TTable table = super.toThrift(); + table.setTable_type(TTableType.SYSTEM_TABLE); + table.setSystem_table(getTSystemTable()); + return table; + } + + /** + * Returns statistics on this table as a tabular result set. Used for the SHOW + * TABLE STATS statement. The schema of the returned TResultSet is set inside + * this method. + */ + public TResultSet getTableStats() { + TResultSet result = new TResultSet(); + TResultSetMetadata resultSchema = new TResultSetMetadata(); + resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift())); + result.setSchema(resultSchema); + TResultRowBuilder rowBuilder = new TResultRowBuilder(); + rowBuilder.add(getNumRows()); + result.addToRows(rowBuilder.get()); + return result; + } + + private static org.apache.hadoop.hive.metastore.api.Table + createMetastoreTable(String dbName, String tableName, String owner, + List<FieldSchema> columns) { + // Based on CatalogOpExecutor#createMetaStoreTable + org.apache.hadoop.hive.metastore.api.Table tbl = + new org.apache.hadoop.hive.metastore.api.Table(); + tbl.setDbName(dbName); + tbl.setTableName(tableName); + tbl.setOwner(owner); + tbl.setParameters(new HashMap<String, String>()); + tbl.setTableType(TableType.MANAGED_TABLE.toString()); + tbl.setPartitionKeys(new ArrayList<FieldSchema>()); + if (MetastoreShim.getMajorVersion() > 2) { + MetastoreShim.setTableAccessType(tbl, ACCESSTYPE_READ); + } + + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new org.apache.hadoop.hive.metastore.api.SerDeInfo()); + sd.getSerdeInfo().setParameters(new HashMap<>()); + sd.setCompressed(false); + sd.setBucketCols(new ArrayList<>(0)); + sd.setSortCols(new ArrayList<>(0)); + sd.setCols(columns); + tbl.setSd(sd); + + return tbl; + } +} diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 413e4949a..566b10272 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -70,6 +70,7 @@ import org.apache.impala.catalog.FeKuduTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.ScalarType; +import org.apache.impala.catalog.SystemTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.AnalysisException; @@ -1894,13 +1895,18 @@ public class SingleNodePlanner { scanNode.addConjuncts(conjuncts); scanNode.init(analyzer); return scanNode; - } else if (tblRef.getTable() instanceof FeKuduTable) { + } else if (table instanceof FeKuduTable) { scanNode = new KuduScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), conjuncts, aggInfo, tblRef); scanNode.init(analyzer); return scanNode; } else if (table instanceof IcebergMetadataTable) { return createIcebergMetadataScanNode(tblRef, conjuncts, analyzer); + } else if (table instanceof SystemTable) { + scanNode = new SystemTableScanNode(ctx_.getNextNodeId(), tblRef.getDesc()); + scanNode.addConjuncts(conjuncts); + scanNode.init(analyzer); + return scanNode; } else { throw new NotImplementedException( "Planning not implemented for table class: " + table.getClass()); diff --git a/fe/src/main/java/org/apache/impala/planner/SystemTableScanNode.java b/fe/src/main/java/org/apache/impala/planner/SystemTableScanNode.java new file mode 100644 index 000000000..ebba72849 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/planner/SystemTableScanNode.java @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.planner; + +import java.util.List; + +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.TupleDescriptor; +import org.apache.impala.catalog.SystemTable; +import org.apache.impala.common.ImpalaException; +import org.apache.impala.common.InternalException; +import org.apache.impala.service.FeSupport; +import org.apache.impala.thrift.TAddressesList; +import org.apache.impala.thrift.TExplainLevel; +import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.thrift.TPlanNode; +import org.apache.impala.thrift.TPlanNodeType; +import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.thrift.TScanRange; +import org.apache.impala.thrift.TScanRangeLocation; +import org.apache.impala.thrift.TScanRangeLocationList; +import org.apache.impala.thrift.TScanRangeSpec; +import org.apache.impala.thrift.TSystemTableScanNode; +import com.google.common.base.MoreObjects; +import com.google.common.collect.Lists; + +public class SystemTableScanNode extends ScanNode { + public SystemTableScanNode(PlanNodeId id, TupleDescriptor desc) { + super(id, desc, "SCAN SYSTEM_TABLE"); + table_ = (SystemTable) desc_.getTable(); + } + + private final SystemTable table_; + + @Override + public void init(Analyzer analyzer) throws ImpalaException { + assignConjuncts(analyzer); + analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_); + conjuncts_ = orderConjunctsByCost(conjuncts_); + // materialize slots in remaining conjuncts_ + analyzer.materializeSlots(conjuncts_); + computeMemLayout(analyzer); + computeScanRangeLocations(analyzer); + computeStats(analyzer); + } + + /** + * Create a single scan range for each coordinator in cluster. + */ + private void computeScanRangeLocations(Analyzer analyzer) throws InternalException { + TAddressesList coordinators_container = FeSupport.GetCoordinators(); + List<TNetworkAddress> coordinators = coordinators_container.getAddresses(); + + scanRangeSpecs_ = new TScanRangeSpec(); + for (TNetworkAddress networkAddress: coordinators) { + // Translate from network address to the global (to this request) host index. + int globalHostIdx = analyzer.getHostIndex().getOrAddIndex(networkAddress); + TScanRange range = new TScanRange(); + // Enable scheduling scan ranges to coordinators independent of the executor group + // or whether they're provisioned as executors. Currently only addresses + // coordinators, but could be expanded to addressing all impalad backends for other + // system tables. + range.setIs_system_scan(true); + scanRangeSpecs_.addToConcrete_ranges(new TScanRangeLocationList( + range, Lists.newArrayList(new TScanRangeLocation(globalHostIdx)))); + } + analyzer.setIncludeAllCoordinatorsInScheduling(true); + } + + @Override + public void computeProcessingCost(TQueryOptions queryOptions) { + // Only cost is serialization as we're grabbing existing data from memory. + processingCost_ = ProcessingCost.zero(); + } + + @Override + public void computeStats(Analyzer analyzer) { + super.computeStats(analyzer); + inputCardinality_ = FeSupport.NumLiveQueries(); + cardinality_ = inputCardinality_; + cardinality_ = applyConjunctsSelectivity(cardinality_); + cardinality_ = Math.max(1, cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); + numInstances_ = numNodes_ = scanRangeSpecs_.getConcrete_rangesSize(); + } + + @Override + protected String debugString() { + return MoreObjects.toStringHelper(this) + .add("tid", desc_.getId().asInt()) + .add("TblName", desc_.getTable().getFullName()) + .addValue(super.debugString()) + .toString(); + } + + @Override + protected void toThrift(TPlanNode msg) { + msg.node_type = TPlanNodeType.SYSTEM_TABLE_SCAN_NODE; + msg.system_table_scan_node = + new TSystemTableScanNode(desc_.getId().asInt(), table_.getSystemTableName()); + } + + @Override + public void computeNodeResourceProfile(TQueryOptions queryOptions) { + // Resource requirements are low since we're grabbing existing data from memory. + nodeResourceProfile_ = ResourceProfile.noReservation(1024L * 1024L); + } + + @Override + protected String getNodeExplainString( + String prefix, String detailPrefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + String aliasStr = ""; + if (!table_.getFullName().equalsIgnoreCase(desc_.getAlias()) + && !table_.getName().equalsIgnoreCase(desc_.getAlias())) { + aliasStr = " " + desc_.getAlias(); + } + + output.append(String.format("%s%s:%s [%s%s]\n", prefix, id_.toString(), displayName_, + table_.getFullName(), aliasStr)); + + if (!conjuncts_.isEmpty()) { + output.append(prefix + "predicates: " + + Expr.getExplainString(conjuncts_, detailLevel) + "\n"); + } + + // Add table and column stats in verbose mode. + if (detailLevel == TExplainLevel.VERBOSE) { + output.append(getStatsExplainString(prefix)); + output.append("\n"); + } + return output.toString(); + } +} diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index ed93a37f8..c2b57972a 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -484,6 +484,11 @@ public class BackendConfig { return backendCfg_.enable_workload_mgmt; } + @VisibleForTesting + public void setEnableWorkloadMgmt(boolean enableWorkloadMgmt) { + backendCfg_.enable_workload_mgmt = enableWorkloadMgmt; + } + public String queryLogTableName() { return backendCfg_.query_log_table_name; } diff --git a/fe/src/main/java/org/apache/impala/service/FeSupport.java b/fe/src/main/java/org/apache/impala/service/FeSupport.java index 4b56ca15c..ed72ba2ff 100644 --- a/fe/src/main/java/org/apache/impala/service/FeSupport.java +++ b/fe/src/main/java/org/apache/impala/service/FeSupport.java @@ -28,6 +28,7 @@ import org.apache.impala.analysis.SlotRef; import org.apache.impala.analysis.TableName; import org.apache.impala.common.InternalException; import org.apache.impala.common.Pair; +import org.apache.impala.thrift.TAddressesList; import org.apache.impala.thrift.TCacheJarParams; import org.apache.impala.thrift.TCacheJarResult; import org.apache.impala.thrift.TCatalogObject; @@ -148,6 +149,12 @@ public class FeSupport { // Does an RPC to the Catalog Server to get the latest compactions. public native static byte[] NativeGetLatestCompactions(byte[] thriftReq); + // Get a list of addresses for coordinators. + public native static byte[] NativeGetCoordinators(); + + // Get the number of live queries. + public native static long NativeNumLiveQueries(); + /** * Locally caches the jar at the specified HDFS location. * @@ -532,6 +539,23 @@ public class FeSupport { return NativeGetLatestCompactions(thriftReq); } + public static TAddressesList GetCoordinators() throws InternalException { + try { + byte[] result = NativeGetCoordinators(); + Preconditions.checkNotNull(result); + TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); + TAddressesList coordinators_list = new TAddressesList(); + deserializer.deserialize(coordinators_list, result); + return coordinators_list; + } catch (TException e) { + throw new InternalException("Error getting coordinators", e); + } + } + + public static long NumLiveQueries() { + return NativeNumLiveQueries(); + } + /** * Calling this function before loadLibrary() causes external frontend * initialization to be used during NativeFeInit() diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 4d9e0c55c..f68ecb2eb 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -127,6 +127,7 @@ import org.apache.impala.catalog.MaterializedViewHdfsTable; import org.apache.impala.catalog.MetaStoreClientPool; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.StructType; +import org.apache.impala.catalog.SystemTable; import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.local.InconsistentMetadataFetchException; @@ -1596,6 +1597,8 @@ public class Frontend { } } else if (table instanceof MaterializedViewHdfsTable) { return ((MaterializedViewHdfsTable) table).getTableStats(); + } else if (table instanceof SystemTable) { + return ((SystemTable) table).getTableStats(); } else { throw new InternalException("Invalid table class: " + table.getClass()); } @@ -1879,6 +1882,10 @@ public class Frontend { && !planner.getAnalysisResult().getAnalyzer().hasPlanHints(); queryCtx.setDisable_spilling(disableSpilling); + if (planner.getAnalysisResult().getAnalyzer().includeAllCoordinatorsInScheduling()) { + result.setInclude_all_coordinators(true); + } + // assign fragment idx int idx = 0; for (TPlanExecInfo planExecInfo: result.plan_exec_info) { diff --git a/fe/src/main/java/org/apache/impala/util/CatalogBlacklistUtils.java b/fe/src/main/java/org/apache/impala/util/CatalogBlacklistUtils.java index 113dd7788..b8777e320 100644 --- a/fe/src/main/java/org/apache/impala/util/CatalogBlacklistUtils.java +++ b/fe/src/main/java/org/apache/impala/util/CatalogBlacklistUtils.java @@ -18,6 +18,7 @@ package org.apache.impala.util; import org.apache.impala.analysis.TableName; +import org.apache.impala.catalog.Db; import java.util.Set; @@ -94,8 +95,8 @@ public class CatalogBlacklistUtils { } public static void verifyDbName(String dbName) throws AnalysisException { - if (BackendConfig.INSTANCE.enableWorkloadMgmt() && dbName.equalsIgnoreCase("sys")) { - // Override 'sys' for Impala system tables. + if (BackendConfig.INSTANCE.enableWorkloadMgmt() && dbName.equalsIgnoreCase(Db.SYS)) { + // Override system DB for Impala system tables. return; } if (BLACKLISTED_DBS.contains(dbName)) { diff --git a/fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java b/fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java new file mode 100644 index 000000000..966ce5975 --- /dev/null +++ b/fe/src/test/java/org/apache/impala/catalog/SystemTableTest.java @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.catalog; + +import org.apache.impala.catalog.Db; +import org.apache.impala.catalog.SystemTable; +import org.apache.impala.common.FrontendTestBase; +import org.apache.impala.thrift.TSystemTableName; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the SystemTable class + */ +public class SystemTableTest extends FrontendTestBase { + @Test + public void testSystemTableNames() { + Db sysDb = feFixture_.addTestDb(Db.SYS, "system db"); + SystemTable queryLiveTable = SystemTable.CreateQueryLiveTable(sysDb, "impala"); + assertEquals(TSystemTableName.QUERY_LIVE, queryLiveTable.getSystemTableName()); + } +} diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 6990ecd03..8695e9551 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -34,6 +34,7 @@ import org.apache.impala.catalog.Type; import org.apache.impala.common.ImpalaException; import org.apache.impala.datagenerator.HBaseTestDataRegionAssignment; import org.apache.impala.planner.IcebergScanPlanner; +import org.apache.impala.service.BackendConfig; import org.apache.impala.service.Frontend.PlanCtx; import org.apache.impala.testutil.TestUtils; import org.apache.impala.testutil.TestUtils.IgnoreValueFilter; @@ -1535,4 +1536,16 @@ public class PlannerTest extends PlannerTestBase { Lists.newArrayList(2, 3)), IcebergScanPlanner.getOrderedEqualityFieldIds(inp)); } + + /** + * Test queries against sys.impala_query_live. + */ + @Test + public void testQueryLive() { + boolean savedEnableWorkloadMgmt = BackendConfig.INSTANCE.enableWorkloadMgmt(); + BackendConfig.INSTANCE.setEnableWorkloadMgmt(true); + addTestDb(Db.SYS, "ensure system db"); + runPlannerTestFile("impala-query-live"); + BackendConfig.INSTANCE.setEnableWorkloadMgmt(savedEnableWorkloadMgmt); + } } diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java index 023a6147c..eee094dea 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java @@ -362,12 +362,13 @@ public class PlannerTestBase extends FrontendTestBase { * Extracts and returns the expected error message from expectedPlan. * Returns null if expectedPlan is empty or its first element is not an error message. * The accepted format for error messages is the exception string. We currently - * support only NotImplementedException and InternalException. + * support only NotImplementedException, InternalException, and AnalysisException. */ private String getExpectedErrorMessage(ArrayList<String> expectedPlan) { if (expectedPlan == null || expectedPlan.isEmpty()) return null; if (!expectedPlan.get(0).contains("NotImplementedException") && - !expectedPlan.get(0).contains("InternalException")) return null; + !expectedPlan.get(0).contains("InternalException") && + !expectedPlan.get(0).contains("AnalysisException")) return null; return expectedPlan.get(0).trim(); } diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/impala-query-live.test b/testdata/workloads/functional-planner/queries/PlannerTest/impala-query-live.test new file mode 100644 index 000000000..343ee806d --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/impala-query-live.test @@ -0,0 +1,34 @@ +# Can query impala_query_live. Skips DISTRIBUTEDPLAN because that requires +# coordinators, which are not configured in frontend tests. +select count(*) from sys.impala_query_live +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| row-size=8B cardinality=1 +| +00:SCAN SYSTEM_TABLE [sys.impala_query_live] + row-size=0B cardinality=1 +==== +# Error trying to create new sys.impala_query_live +create table sys.impala_query_live (i int) +---- PLAN +AnalysisException: Table already exists: sys.impala_query_live +==== +drop table sys.impala_query_live +---- PLAN +AnalysisException: Write not supported. Table sys.impala_query_live access type is: READONLY +==== +insert into sys.impala_query_live values (1) +---- PLAN +AnalysisException: Write not supported. Table sys.impala_query_live access type is: READONLY +==== +update sys.impala_query_live set query_id = "nonsense" +---- PLAN +AnalysisException: Impala only supports modifying Kudu and Iceberg tables, but the following table is neither: sys.impala_query_live +==== +delete sys.impala_query_live where query_id = "nonsense" +---- PLAN +AnalysisException: Impala only supports modifying Kudu and Iceberg tables, but the following table is neither: sys.impala_query_live +==== diff --git a/tests/custom_cluster/test_query_live.py b/tests/custom_cluster/test_query_live.py new file mode 100644 index 000000000..ee8c17e0c --- /dev/null +++ b/tests/custom_cluster/test_query_live.py @@ -0,0 +1,331 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function + +import re + +from beeswaxd.BeeswaxService import QueryState +from getpass import getuser +from signal import SIGRTMIN +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.impala_cluster import DEFAULT_KRPC_PORT +from tests.util.workload_management import assert_query +from time import sleep + + +class TestQueryLive(CustomClusterTestSuite): + """Tests to assert the query live table is correctly populated.""" + + def assert_impalads(self, profile, present=[0, 1, 2], absent=[]): + for port_idx in present: + assert ":" + str(DEFAULT_KRPC_PORT + port_idx) + ":" in profile + for port_idx in absent: + assert ":" + str(DEFAULT_KRPC_PORT + port_idx) not in profile + + def assert_only_coordinators(self, profile, coords=[0, 1], execs=[2]): + self.assert_impalads(profile, coords, execs) + assert "SYSTEM_TABLE_SCAN_NODE (id=0) [{} instances]".format(len(coords)) in profile + + def assert_fragment_instances(self, profile, expected): + """Asserts that the per host number of fragment instances is as expected.""" + hosts = ['{}({})'.format(DEFAULT_KRPC_PORT + i, expect) + for i, expect in enumerate(expected)] + actual_hosts = re.search(r'Per Host Number of Fragment Instances: (.*)', profile) + assert actual_hosts is not None + # Split and remove hostname + actual_hosts = [host.split(':')[1] for host in actual_hosts.group(1).split(' ')] + assert len(hosts) == len(actual_hosts) + for host in hosts: + if host in actual_hosts: + actual_hosts.remove(host) + else: + assert False, "did not find host {}".format(host) + assert len(actual_hosts) == 0, "did not find all expected hosts" + + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--cluster_id=test_query_live", + catalogd_args="--enable_workload_mgmt") + def test_query_live(self): + """Asserts the query live table shows and allows filtering queries.""" + # Use a query that reads data from disk for the 1st one, as more representative and a + # better fit for assert_query. + result1 = self.client.execute("select * from functional.alltypes", + fetch_profile_after_close=True) + assert_query('sys.impala_query_live', self.client, 'test_query_live', + result1.runtime_profile) + + # pending queries + result2 = self.execute_query( + "select query_id, cluster_id from sys.impala_query_live order by start_time_utc") + assert len(result2.data) == 3 + assert "{}\t{}".format(result1.query_id, "test_query_live") == result2.data[0] + assert "{}\t{}".format(result2.query_id, "test_query_live") == result2.data[2] + # Expect new metrics for the impala_query_live table scanner. + assert "ActiveQueryCollectionTime: " in result2.runtime_profile + assert "PendingQueryCollectionTime: " in result2.runtime_profile + + # query filtering + result3 = self.execute_query( + "select query_id from sys.impala_query_live " + "where total_time_ns > 0.0 order by start_time_utc") + assert len(result3.data) == 4 + assert result1.query_id == result3.data[0] + assert result2.query_id == result3.data[2] + assert result3.query_id == result3.data[3] + + result4 = self.execute_query( + "select db_name, db_user, count(*) as query_count from sys.impala_query_live " + "group by db_name, db_user order by db_name") + assert len(result4.data) == 1 + assert "default\t{}\t5".format(getuser()) == result4.data[0] + + result5 = self.execute_query( + 'select * from sys.impala_query_live where cluster_id = "test_query_live_0"') + assert len(result5.data) == 0 + + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--cluster_id=test_query_live", + catalogd_args="--enable_workload_mgmt", + cluster_size=3, + num_exclusive_coordinators=2) + def test_dedicated_coordinators(self): + """Asserts scans are performed only on coordinators.""" + # Use a query that reads data from disk for the 1st one, as more representative and a + # better fit for assert_query. + result = self.client.execute("select * from functional.alltypes", + fetch_profile_after_close=True) + assert_query('sys.impala_query_live', self.client, 'test_query_live', + result.runtime_profile) + + client2 = self.create_client_for_nth_impalad(1) + query = "select query_id, impala_coordinator from sys.impala_query_live " \ + "order by start_time_utc" + handle1 = self.execute_query_async(query) + handle2 = client2.execute_async(query) + result1 = self.client.fetch(query, handle1) + result2 = client2.fetch(query, handle2) + assert len(result1.data) == 4 + assert result1.data == result2.data + + profile1 = self.client.get_runtime_profile(handle1) + self.assert_only_coordinators(profile1) + profile2 = client2.get_runtime_profile(handle2) + self.assert_only_coordinators(profile2) + + self.close_query(handle1) + client2.close_query(handle2) + client2.close() + + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--cluster_id=test_query_live", + catalogd_args="--enable_workload_mgmt", + cluster_size=3, + num_exclusive_coordinators=2) + def test_executor_groups(self): + """Asserts scans are performed only on coordinators with multiple executor groups.""" + # Add a (non-dedicated) coordinator and executor in a different executor group. + self._start_impala_cluster(options=['--impalad_args=--executor_groups=extra'], + cluster_size=1, + add_executors=True, + expected_num_impalads=4) + + result = self.client.execute( + "select query_id, impala_coordinator from sys.impala_query_live", + fetch_profile_after_close=True) + assert len(result.data) == 1 + self.assert_only_coordinators(result.runtime_profile, coords=[0, 1], execs=[2, 3]) + + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--cluster_id=test_query_live", + catalogd_args="--enable_workload_mgmt") + def test_query_entries_are_unique(self): + """Asserts queries in the query live table are unique.""" + # Start a query and close it with a delay between CloseClientRequestState and + # Unregister. Then query sys.impala_query_live from multiple coordinators. + client2 = self.create_client_for_nth_impalad(1) + + async_handle = self.execute_query_async("select count(*) from functional.alltypes", + {"debug_action": "CLOSED_NOT_UNREGISTERED:SLEEP@1000"}) + self.close_query(async_handle) + + query = "select * from sys.impala_query_live order by start_time_utc" + handle = self.execute_query_async(query) + handle2 = client2.execute_async(query) + result = self.client.fetch(query, handle) + result2 = client2.fetch(query, handle2) + assert len(result.data) == 3 + assert result.data[0] == result2.data[0] + + def remove_dynamic_fields(fields): + # Excludes QUERY_STATE, IMPALA_QUERY_END_STATE, QUERY_TYPE, TOTAL_TIME_NS, and + # everything after QUERY_OPTS_CONFIG as they change over the course of compiling + # and running the query. + return fields[:10] + fields[13:15] + fields[16:17] + + # Compare cluster_id and query_id. Not all fields will match as they're queried on a + # live query at different times. Order is only guaranteed with minicluster on a + # single machine, where they use the same clock. + data1a = remove_dynamic_fields(result.data[1].split('\t')) + data1b = remove_dynamic_fields(result.data[2].split('\t')) + data2a = remove_dynamic_fields(result2.data[1].split('\t')) + data2b = remove_dynamic_fields(result2.data[2].split('\t')) + assert data1a == data2a + assert data1b == data2b + + self.close_query(handle) + client2.close_query(handle2) + client2.close() + + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--cluster_id=test_query_live", + catalogd_args="--enable_workload_mgmt", + cluster_size=3, + num_exclusive_coordinators=2) + def test_missing_coordinator(self): + """Asserts scans finish if a coordinator disappears mid-schedule. Depends on + test config of statestore_heartbeat_frequency_ms=50.""" + query = "select query_id, impala_coordinator from sys.impala_query_live" + handle = self.execute_query_async(query, query_options={ + 'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@3000'}) + + # Wait for query to compile and assign ranges, then kill impalad during debug delay. + self.wait_for_any_state(handle, [QueryState.COMPILED], 3) + self.cluster.impalads[1].kill() + + result = self.client.fetch(query, handle) + assert len(result.data) == 1 + expected_message = 'is no longer available for system table scan assignment' + self.assert_impalad_log_contains('WARNING', expected_message) + assert expected_message in self.client.get_runtime_profile(handle) + + self.close_query(handle) + + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--cluster_id=test_query_live", + catalogd_args="--enable_workload_mgmt") + def test_shutdown_coordinator(self): + """Asserts query fails if a coordinator disappears after scheduling. Depends on + test config of statestore_heartbeat_frequency_ms=50.""" + query = "select query_id, impala_coordinator from sys.impala_query_live" + handle = self.execute_query_async(query, query_options={ + 'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@3000'}) + + # Wait for query to compile. + self.wait_for_any_state(handle, [QueryState.COMPILED], 3) + # Ensure enough time for scheduling to assign ranges. + sleep(1) + # Kill impalad during debug delay. + self.cluster.impalads[1].kill() + + try: + self.client.fetch(query, handle) + assert False, "fetch should fail" + except Exception as e: + assert "Network error: Client connection negotiation failed" in str(e) + # Beeswax client closes the query on failure. + + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--cluster_id=test_query_live", + catalogd_args="--enable_workload_mgmt") + def test_graceful_shutdown_coordinator(self): + """Asserts query succeeds if another coordinator is shutdown gracefully after + scheduling. Depends on test config of statestore_heartbeat_frequency_ms=50.""" + query = "select query_id from sys.impala_query_live" + handle = self.execute_query_async(query, query_options={ + 'debug_action': 'CRS_BEFORE_COORD_STARTS:SLEEP@3000'}) + + # Wait for query to compile and assign ranges, then gracefully shutdown impalad. + self.wait_for_any_state(handle, [QueryState.COMPILED], 3) + self.cluster.impalads[1].kill(SIGRTMIN) + + self.wait_for_any_state(handle, [QueryState.FINISHED], 10) + # Allow time for statestore update to propagate. Shutdown grace period is 120s. + sleep(1) + # Coordinator in graceful shutdown should not be scheduled in new queries. + shutdown = self.execute_query(query) + + result = self.client.fetch(query, handle) + assert len(result.data) == 1 + assert result.query_id == result.data[0] + self.client.get_runtime_profile(handle) + self.assert_impalads(self.client.get_runtime_profile(handle)) + self.close_query(handle) + + assert len(shutdown.data) == 2 + self.assert_impalads(shutdown.runtime_profile, present=[0, 2], absent=[1]) + + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--query_log_write_interval_s=1 " + "--cluster_id=test_query_live", + catalogd_args="--enable_workload_mgmt", + cluster_size=3, + num_exclusive_coordinators=2) + def test_multi_table_union(self): + """Asserts only system table scan fragments are scheduled to coordinators.""" + utc_timestamp = self.execute_query('select utc_timestamp()') + assert len(utc_timestamp.data) == 1 + start_time = utc_timestamp.data[0] + + # Wait for the query to be written to the log to ensure there's something in it." + logged = self.execute_query_expect_success(self.client, + 'select count(*) from functional.alltypes') + self.cluster.get_first_impalad().service.wait_for_metric_value( + "impala-server.completed-queries.written", 1, 30, allow_greater=True) + + query = """select query_id from + (select query_id, start_time_utc from sys.impala_query_live live + where start_time_utc > "{0}" union + select query_id, start_time_utc from sys.impala_query_log + where start_time_utc > "{0}") + as history order by start_time_utc""".format(start_time) + result = self.client.execute(query, fetch_profile_after_close=True) + assert len(result.data) == 3 + assert utc_timestamp.query_id == result.data[0] + assert logged.query_id == result.data[1] + assert result.query_id == result.data[2] + + # Unions run in a single fragment, and on the union of selected scan nodes (even + # though some may not have actual scan ranges to evaluate). So all nodes are involved + # in scans. + self.assert_fragment_instances(result.runtime_profile, [3, 2, 2]) + + @CustomClusterTestSuite.with_args(impalad_args="--enable_workload_mgmt " + "--cluster_id=test_query_live", + catalogd_args="--enable_workload_mgmt", + cluster_size=3, + num_exclusive_coordinators=2) + def test_multi_table_join(self, unique_database): + """Asserts only system table scan fragments are scheduled to coordinators.""" + self.execute_query('create table {}.users (user string)'.format(unique_database)) + self.execute_query('insert into {}.users values ("alice"), ("bob"), ("{}")' + .format(unique_database, getuser())) + + result = self.client.execute('select straight_join count(*) from {}.users, ' + 'sys.impala_query_live where user = db_user and start_time_utc > utc_timestamp()' + .format(unique_database), fetch_profile_after_close=True) + assert len(result.data) == 1 + assert '1' == result.data[0] + + # HDFS scan runs on 1 node, System Table scan on 2 + assert 2 == result.runtime_profile.count('HDFS_SCAN_NODE') + assert 3 == result.runtime_profile.count('SYSTEM_TABLE_SCAN_NODE') + + # impala_query_live is assigned to build side, so executor has HDFS scan fragment and + # aggregation, coordinators have System Table scan fragments, and initial coordinator + # has the root fragment. + self.assert_fragment_instances(result.runtime_profile, [2, 1, 1]) diff --git a/tests/util/workload_management.py b/tests/util/workload_management.py index 85e4797b5..900e523b3 100644 --- a/tests/util/workload_management.py +++ b/tests/util/workload_management.py @@ -104,7 +104,8 @@ def assert_query(query_tbl, client, expected_cluster_id, raw_profile=None, impal profile_lines = profile_text.split("\n") # Force Impala to process the inserts to the completed queries table. - client.execute("refresh " + query_tbl) + if query_tbl != 'sys.impala_query_live': + client.execute("refresh " + query_tbl) # Assert the query was written correctly to the query log table. if max_row_size is not None:
