http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/plan-fragment-executor.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc deleted file mode 100644 index df5b0d2..0000000 --- a/be/src/runtime/plan-fragment-executor.cc +++ /dev/null @@ -1,518 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/plan-fragment-executor.h" - -#include <thrift/protocol/TDebugProtocol.h> -#include <boost/date_time/posix_time/posix_time_types.hpp> -#include <boost/unordered_map.hpp> -#include <gutil/strings/substitute.h> - -#include "codegen/llvm-codegen.h" -#include "common/logging.h" -#include "common/object-pool.h" -#include "exec/data-sink.h" -#include "exec/exchange-node.h" -#include "exec/exec-node.h" -#include "exec/hbase-table-scanner.h" -#include "exec/hdfs-scan-node.h" -#include "exec/plan-root-sink.h" -#include "exec/scan-node.h" -#include "exprs/expr.h" -#include "runtime/data-stream-mgr.h" -#include "runtime/descriptors.h" -#include "runtime/mem-tracker.h" -#include "runtime/row-batch.h" -#include "runtime/query-state.h" -#include "runtime/runtime-filter-bank.h" -#include "runtime/exec-env.h" -#include "util/container-util.h" -#include "runtime/runtime-state.h" -#include "util/cpu-info.h" -#include "util/debug-util.h" -#include "util/mem-info.h" -#include "util/parse-util.h" -#include "util/periodic-counter-updater.h" -#include "util/pretty-printer.h" - -DEFINE_bool(serialize_batch, false, "serialize and deserialize each returned row batch"); -DEFINE_int32(status_report_interval, 5, "interval between profile reports; in seconds"); - -#include "common/names.h" - -namespace posix_time = boost::posix_time; -using boost::get_system_time; -using boost::system_time; -using namespace apache::thrift; -using namespace strings; - -namespace impala { - -const string PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER = "PerHostPeakMemUsage"; - -namespace { -const string OPEN_TIMER_NAME = "OpenTime"; -const string PREPARE_TIMER_NAME = "PrepareTime"; -const string EXEC_TIMER_NAME = "ExecTime"; -} - -PlanFragmentExecutor::PlanFragmentExecutor( - const ReportStatusCallback& report_status_cb) - : exec_tree_(NULL), - report_status_cb_(report_status_cb), - report_thread_active_(false), - closed_(false), - has_thread_token_(false), - timings_profile_(NULL), - root_sink_(NULL), - is_prepared_(false), - is_cancelled_(false), - per_host_mem_usage_(NULL), - rows_produced_counter_(NULL), - average_thread_tokens_(NULL), - mem_usage_sampled_counter_(NULL), - thread_usage_sampled_counter_(NULL) {} - -PlanFragmentExecutor::~PlanFragmentExecutor() { - DCHECK(!is_prepared_ || closed_); - // at this point, the report thread should have been stopped - DCHECK(!report_thread_active_); -} - -Status PlanFragmentExecutor::Prepare( - QueryState* query_state, const TDescriptorTable& desc_tbl, - const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) { - Status status = PrepareInternal(query_state, desc_tbl, fragment_ctx, instance_ctx); - prepared_promise_.Set(status); - if (!status.ok()) FragmentComplete(status); - return status; -} - -Status PlanFragmentExecutor::WaitForOpen() { - DCHECK(prepared_promise_.IsSet()) << "Prepare() must complete before WaitForOpen()"; - RETURN_IF_ERROR(prepared_promise_.Get()); - return opened_promise_.Get(); -} - -Status PlanFragmentExecutor::PrepareInternal( - QueryState* qs, const TDescriptorTable& tdesc_tbl, - const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) { - lock_guard<mutex> l(prepare_lock_); - DCHECK(!is_prepared_); - - if (is_cancelled_) return Status::CANCELLED; - is_prepared_ = true; - - // TODO: Break this method up. - query_id_ = qs->query_ctx().query_id; - - VLOG_QUERY << "Prepare(): instance_id=" - << PrintId(instance_ctx.fragment_instance_id); - VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx); - - // Prepare() must not return before runtime_state_ is set if is_prepared_ was - // set. Having runtime_state_.get() != NULL is a postcondition of this method in that - // case. Do not call RETURN_IF_ERROR or explicitly return before this line. - runtime_state_.reset( - new RuntimeState(qs, fragment_ctx, instance_ctx, ExecEnv::GetInstance())); - - // total_time_counter() is in the runtime_state_ so start it up now. - SCOPED_TIMER(profile()->total_time_counter()); - timings_profile_ = obj_pool()->Add( - new RuntimeProfile(obj_pool(), "Fragment Instance Lifecycle Timings")); - profile()->AddChild(timings_profile_); - SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME)); - - // reservation or a query option. - int64_t bytes_limit = -1; - if (runtime_state_->query_options().__isset.mem_limit && - runtime_state_->query_options().mem_limit > 0) { - bytes_limit = runtime_state_->query_options().mem_limit; - VLOG_QUERY << "Using query memory limit from query options: " - << PrettyPrinter::Print(bytes_limit, TUnit::BYTES); - } - - DCHECK(!instance_ctx.request_pool.empty()); - RETURN_IF_ERROR(runtime_state_->CreateBlockMgr()); - runtime_state_->InitFilterBank(); - - // Reserve one main thread from the pool - runtime_state_->resource_pool()->AcquireThreadToken(); - has_thread_token_ = true; - - average_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens", - bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads), - runtime_state_->resource_pool())); - mem_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("MemoryUsage", - TUnit::BYTES, - bind<int64_t>(mem_fn(&MemTracker::consumption), - runtime_state_->instance_mem_tracker())); - thread_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("ThreadUsage", - TUnit::UNIT, - bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads), - runtime_state_->resource_pool())); - - // set up desc tbl - DescriptorTbl* desc_tbl = NULL; - RETURN_IF_ERROR(DescriptorTbl::Create(obj_pool(), tdesc_tbl, &desc_tbl)); - runtime_state_->set_desc_tbl(desc_tbl); - VLOG_QUERY << "descriptor table for fragment=" << instance_ctx.fragment_instance_id - << "\n" << desc_tbl->DebugString(); - - // set up plan - RETURN_IF_ERROR(ExecNode::CreateTree( - runtime_state_.get(), fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_)); - runtime_state_->set_fragment_root_id(exec_tree_->id()); - - if (instance_ctx.__isset.debug_node_id) { - DCHECK(instance_ctx.__isset.debug_action); - DCHECK(instance_ctx.__isset.debug_phase); - ExecNode::SetDebugOptions(instance_ctx.debug_node_id, instance_ctx.debug_phase, - instance_ctx.debug_action, exec_tree_); - } - - // set #senders of exchange nodes before calling Prepare() - vector<ExecNode*> exch_nodes; - exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes); - for (ExecNode* exch_node : exch_nodes) { - DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE); - int num_senders = - FindWithDefault(instance_ctx.per_exch_num_senders, exch_node->id(), 0); - DCHECK_GT(num_senders, 0); - static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders); - } - - // set scan ranges - vector<ExecNode*> scan_nodes; - vector<TScanRangeParams> no_scan_ranges; - exec_tree_->CollectScanNodes(&scan_nodes); - for (int i = 0; i < scan_nodes.size(); ++i) { - ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]); - const vector<TScanRangeParams>& scan_ranges = FindWithDefault( - instance_ctx.per_node_scan_ranges, scan_node->id(), no_scan_ranges); - scan_node->SetScanRanges(scan_ranges); - } - - RuntimeState* state = runtime_state(); - RuntimeProfile::Counter* prepare_timer = - ADD_CHILD_TIMER(timings_profile_, "ExecTreePrepareTime", PREPARE_TIMER_NAME); - { - SCOPED_TIMER(prepare_timer); - RETURN_IF_ERROR(exec_tree_->Prepare(state)); - } - - PrintVolumeIds(instance_ctx.per_node_scan_ranges); - - DCHECK(fragment_ctx.fragment.__isset.output_sink); - RETURN_IF_ERROR( - DataSink::CreateDataSink(obj_pool(), fragment_ctx.fragment.output_sink, - fragment_ctx.fragment.output_exprs, instance_ctx, exec_tree_->row_desc(), - &sink_)); - RETURN_IF_ERROR( - sink_->Prepare(runtime_state(), runtime_state_->instance_mem_tracker())); - - RuntimeProfile* sink_profile = sink_->profile(); - if (sink_profile != NULL) { - profile()->AddChild(sink_profile); - } - - if (fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) { - root_sink_ = reinterpret_cast<PlanRootSink*>(sink_.get()); - // Release the thread token on the root fragment instance. This fragment spends most - // of the time waiting and doing very little work. Holding on to the token causes - // underutilization of the machine. If there are 12 queries on this node, that's 12 - // tokens reserved for no reason. - ReleaseThreadToken(); - } - - if (state->ShouldCodegen()) { - RETURN_IF_ERROR(state->CreateCodegen()); - exec_tree_->Codegen(state); - // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed, - // ScalarFnCall has no fall back to interpretation when codegen fails so propagates - // the error status for now. - RETURN_IF_ERROR(state->CodegenScalarFns()); - } - - // set up profile counters - profile()->AddChild(exec_tree_->runtime_profile()); - rows_produced_counter_ = - ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT); - per_host_mem_usage_ = - ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES); - - row_batch_.reset(new RowBatch(exec_tree_->row_desc(), state->batch_size(), - state->instance_mem_tracker())); - VLOG(2) << "plan_root=\n" << exec_tree_->DebugString(); - return Status::OK(); -} - -Status PlanFragmentExecutor::OptimizeLlvmModule() { - if (!runtime_state_->ShouldCodegen()) return Status::OK(); - LlvmCodeGen* codegen = runtime_state_->codegen(); - DCHECK(codegen != NULL); - return codegen->FinalizeModule(); -} - -void PlanFragmentExecutor::PrintVolumeIds( - const PerNodeScanRanges& per_node_scan_ranges) { - if (per_node_scan_ranges.empty()) return; - - HdfsScanNode::PerVolumnStats per_volume_stats; - for (const PerNodeScanRanges::value_type& entry: per_node_scan_ranges) { - HdfsScanNode::UpdateHdfsSplitStats(entry.second, &per_volume_stats); - } - - stringstream str; - - HdfsScanNode::PrintHdfsSplitStats(per_volume_stats, &str); - profile()->AddInfoString(HdfsScanNode::HDFS_SPLIT_STATS_DESC, str.str()); - VLOG_FILE - << "Hdfs split stats (<volume id>:<# splits>/<split lengths>) for query=" - << query_id_ << ":\n" << str.str(); -} - -Status PlanFragmentExecutor::Open() { - DCHECK(prepared_promise_.IsSet() && prepared_promise_.Get().ok()); - Status status; - { - SCOPED_TIMER(profile()->total_time_counter()); - SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME)); - VLOG_QUERY << "Open(): instance_id=" << runtime_state_->fragment_instance_id(); - status = OpenInternal(); - } - if (!status.ok()) FragmentComplete(status); - opened_promise_.Set(status); - return status; -} - -Status PlanFragmentExecutor::OpenInternal() { - SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics()); - RETURN_IF_ERROR( - runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get())); - - // We need to start the profile-reporting thread before calling exec_tree_->Open(), - // since it may block. - if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) { - unique_lock<mutex> l(report_thread_lock_); - report_thread_.reset( - new Thread("plan-fragment-executor", "report-profile", - &PlanFragmentExecutor::ReportProfileThread, this)); - // Make sure the thread started up, otherwise ReportProfileThread() might get into - // a race with StopReportThread(). - while (!report_thread_active_) report_thread_started_cv_.wait(l); - } - - RETURN_IF_ERROR(OptimizeLlvmModule()); - - { - SCOPED_TIMER(ADD_CHILD_TIMER(timings_profile_, "ExecTreeOpenTime", OPEN_TIMER_NAME)); - RETURN_IF_ERROR(exec_tree_->Open(runtime_state_.get())); - } - return sink_->Open(runtime_state_.get()); -} - -Status PlanFragmentExecutor::Exec() { - DCHECK(opened_promise_.IsSet() && opened_promise_.Get().ok()); - Status status; - { - // Must go out of scope before FragmentComplete(), otherwise counter will not be - // updated by time final profile is sent. - SCOPED_TIMER(profile()->total_time_counter()); - SCOPED_TIMER(ADD_TIMER(timings_profile_, EXEC_TIMER_NAME)); - status = ExecInternal(); - } - FragmentComplete(status); - return status; -} - -Status PlanFragmentExecutor::ExecInternal() { - RuntimeProfile::Counter* plan_exec_timer = - ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME); - SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics()); - bool exec_tree_complete = false; - do { - Status status; - row_batch_->Reset(); - { - SCOPED_TIMER(plan_exec_timer); - status = exec_tree_->GetNext( - runtime_state_.get(), row_batch_.get(), &exec_tree_complete); - } - if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::ExecInternal()"); - COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows()); - RETURN_IF_ERROR(status); - RETURN_IF_ERROR(sink_->Send(runtime_state(), row_batch_.get())); - } while (!exec_tree_complete); - - // Flush the sink *before* stopping the report thread. Flush may need to add some - // important information to the last report that gets sent. (e.g. table sinks record the - // files they have written to in this method) - RETURN_IF_ERROR(sink_->FlushFinal(runtime_state())); - return Status::OK(); -} - -void PlanFragmentExecutor::ReportProfileThread() { - VLOG_FILE << "ReportProfileThread(): instance_id=" - << runtime_state_->fragment_instance_id(); - DCHECK(!report_status_cb_.empty()); - unique_lock<mutex> l(report_thread_lock_); - // tell Open() that we started - report_thread_active_ = true; - report_thread_started_cv_.notify_one(); - - // Jitter the reporting time of remote fragments by a random amount between - // 0 and the report_interval. This way, the coordinator doesn't get all the - // updates at once so its better for contention as well as smoother progress - // reporting. - int report_fragment_offset = rand() % FLAGS_status_report_interval; - system_time timeout = get_system_time() - + posix_time::seconds(report_fragment_offset); - // We don't want to wait longer than it takes to run the entire fragment. - stop_report_thread_cv_.timed_wait(l, timeout); - - while (report_thread_active_) { - system_time timeout = get_system_time() - + posix_time::seconds(FLAGS_status_report_interval); - - // timed_wait can return because the timeout occurred or the condition variable - // was signaled. We can't rely on its return value to distinguish between the - // two cases (e.g. there is a race here where the wait timed out but before grabbing - // the lock, the condition variable was signaled). Instead, we will use an external - // flag, report_thread_active_, to coordinate this. - stop_report_thread_cv_.timed_wait(l, timeout); - - if (VLOG_FILE_IS_ON) { - VLOG_FILE << "Reporting " << (!report_thread_active_ ? "final " : " ") - << "profile for instance " << runtime_state_->fragment_instance_id(); - stringstream ss; - profile()->PrettyPrint(&ss); - VLOG_FILE << ss.str(); - } - - if (!report_thread_active_) break; - SendReport(false, Status::OK()); - } - - VLOG_FILE << "exiting reporting thread: instance_id=" - << runtime_state_->fragment_instance_id(); -} - -void PlanFragmentExecutor::SendReport(bool done, const Status& status) { - DCHECK(status.ok() || done); - if (report_status_cb_.empty()) return; - - // Update the counter for the peak per host mem usage. - if (per_host_mem_usage_ != nullptr) { - per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption()); - } - - // This will send a report even if we are cancelled. If the query completed correctly - // but fragments still need to be cancelled (e.g. limit reached), the coordinator will - // be waiting for a final report and profile. - RuntimeProfile* prof = is_prepared_ ? profile() : nullptr; - report_status_cb_(status, prof, done); -} - -void PlanFragmentExecutor::StopReportThread() { - if (!report_thread_active_) return; - { - lock_guard<mutex> l(report_thread_lock_); - report_thread_active_ = false; - } - stop_report_thread_cv_.notify_one(); - report_thread_->Join(); -} - -void PlanFragmentExecutor::FragmentComplete(const Status& status) { - ReleaseThreadToken(); - StopReportThread(); - // It's safe to send final report now that the reporting thread is stopped. - SendReport(true, status); -} - -void PlanFragmentExecutor::Cancel() { - VLOG_QUERY << "Cancelling fragment instance..."; - lock_guard<mutex> l(prepare_lock_); - is_cancelled_ = true; - if (!is_prepared_) { - VLOG_QUERY << "Cancel() called before Prepare()"; - return; - } - - // Ensure that the sink is closed from both sides. Although in ordinary executions we - // rely on the consumer to do this, in error cases the consumer may not be able to send - // CloseConsumer() (see IMPALA-4348 for an example). - if (root_sink_ != nullptr) root_sink_->CloseConsumer(); - - DCHECK(runtime_state_ != NULL); - VLOG_QUERY << "Cancel(): instance_id=" << runtime_state_->fragment_instance_id(); - runtime_state_->set_is_cancelled(true); - runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id()); -} - -RuntimeProfile* PlanFragmentExecutor::profile() { - return runtime_state_->runtime_profile(); -} - -void PlanFragmentExecutor::ReleaseThreadToken() { - if (has_thread_token_) { - has_thread_token_ = false; - runtime_state_->resource_pool()->ReleaseThreadToken(true); - PeriodicCounterUpdater::StopSamplingCounter(average_thread_tokens_); - PeriodicCounterUpdater::StopTimeSeriesCounter( - thread_usage_sampled_counter_); - } -} - -void PlanFragmentExecutor::Close() { - DCHECK(!has_thread_token_); - DCHECK(!report_thread_active_); - - if (closed_) return; - if (!is_prepared_) return; - if (sink_.get() != nullptr) sink_->Close(runtime_state()); - - row_batch_.reset(); - - // Prepare should always have been called, and so runtime_state_ should be set - DCHECK(prepared_promise_.IsSet()); - if (exec_tree_ != NULL) exec_tree_->Close(runtime_state_.get()); - if (mem_usage_sampled_counter_ != NULL) { - // This counter references runtime_state_->instance_mem_tracker() so must be - // stopped before calling ReleaseResources(). - PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_); - mem_usage_sampled_counter_ = NULL; - } - // Sanity timer checks -#ifndef NDEBUG - int64_t total_time = profile()->total_time_counter()->value(); - int64_t other_time = 0; - for (auto& name: {PREPARE_TIMER_NAME, OPEN_TIMER_NAME, EXEC_TIMER_NAME}) { - RuntimeProfile::Counter* counter = timings_profile_->GetCounter(name); - if (counter != nullptr) other_time += counter->value(); - } - // TODO: IMPALA-4631: Occasionally we see other_time = total_time + 1 for some reason - // we don't yet understand, so add 1 to total_time to avoid DCHECKing in that case. - DCHECK_LE(other_time, total_time + 1); -#endif - runtime_state_->ReleaseResources(); - - closed_ = true; -} - -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/plan-fragment-executor.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h deleted file mode 100644 index 2194e58..0000000 --- a/be/src/runtime/plan-fragment-executor.h +++ /dev/null @@ -1,305 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#ifndef IMPALA_SERVICE_PLAN_EXECUTOR_H -#define IMPALA_SERVICE_PLAN_EXECUTOR_H - -#include <vector> -#include <boost/scoped_ptr.hpp> -#include <boost/function.hpp> - -#include "common/object-pool.h" -#include "common/status.h" -#include "runtime/runtime-state.h" -#include "util/promise.h" -#include "util/runtime-profile-counters.h" -#include "util/thread.h" - -namespace impala { - -class HdfsFsCache; -class ExecNode; -class PlanRootSink; -class RowDescriptor; -class RowBatch; -class DataSink; -class DataStreamMgr; -class RuntimeProfile; -class RuntimeState; -class TRowBatch; -class TPlanExecRequest; -class TPlanFragment; -class TPlanExecParams; - -/// PlanFragmentExecutor handles all aspects of the execution of a single plan fragment, -/// including setup and tear-down, both in the success and error case. Tear-down, which -/// happens in Close(), frees all memory allocated for this plan fragment and closes all -/// data streams. -/// -/// The lifecycle of a PlanFragmentExecutor is as follows: -/// if (Prepare().ok()) { -/// Open() -/// Exec() -/// } -/// Close() -/// -/// The executor makes an aggregated profile for the entire fragment available, which -/// includes profile information for the plan itself as well as the output sink. -/// -/// The ReportStatusCallback passed into the c'tor is invoked periodically to report the -/// execution profile. The frequency of those reports is controlled by the flag -/// status_report_interval; setting that flag to 0 disables periodic reporting altogether -/// Regardless of the value of that flag, if a report callback is specified, it is invoked -/// at least once at the end of execution with an overall status and profile (and 'done' -/// indicator). -/// -/// Aside from Cancel(), which may be called asynchronously, this class is not -/// thread-safe. -class PlanFragmentExecutor { - public: - /// Callback to report execution status of plan fragment. - /// 'profile' is the cumulative profile, 'done' indicates whether the execution - /// is done or still continuing. - /// Note: this does not take a const RuntimeProfile&, because it might need to call - /// functions like PrettyPrint() or ToThrift(), neither of which is const - /// because they take locks. - typedef boost::function< - void (const Status& status, RuntimeProfile* profile, bool done)> - ReportStatusCallback; - - /// report_status_cb, if !empty(), is used to report the accumulated profile - /// information periodically during execution. - PlanFragmentExecutor(const ReportStatusCallback& report_status_cb); - - /// It is an error to delete a PlanFragmentExecutor with a report callback before Exec() - /// indicated that execution is finished, or to delete one that has not been Close()'d - /// if Prepare() has been called. - ~PlanFragmentExecutor(); - - /// Prepare for execution. Call this prior to Open(). - /// - /// runtime_state() will not be valid until Prepare() is called. runtime_state() will - /// always be valid after Prepare() returns, unless the query was cancelled before - /// Prepare() was called. If request.query_options.mem_limit > 0, it is used as an - /// approximate limit on the number of bytes this query can consume at runtime. The - /// query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit. - /// - /// If Cancel() is called before Prepare(), Prepare() is a no-op and returns - /// Status::CANCELLED; - /// - /// If Prepare() fails, it will invoke final status callback with the error status. - /// TODO: remove desc_tbl parameter once we do a per-query exec rpc (and we - /// have a single descriptor table to cover all fragment instances); at the moment - /// we need to pass the TDescriptorTable explicitly - Status Prepare(QueryState* query_state, const TDescriptorTable& desc_tbl, - const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx); - - /// Opens the fragment plan and sink. Starts the profile reporting thread, if - /// required. Can be called only if Prepare() succeeded. If Open() fails it will - /// invoke the final status callback with the error status. - /// TODO: is this needed? It's only ever called in conjunction with Exec() and Close() - Status Open(); - - /// Executes the fragment by repeatedly driving the sink with batches produced by the - /// exec node tree. report_status_cb will have been called for the final time when - /// Exec() returns, and the status-reporting thread will have been stopped. Can be - /// called only if Open() succeeded. - Status Exec(); - - /// Closes the underlying plan fragment and frees up all resources allocated in - /// Prepare() and Open(). Must be called if Prepare() has been called - no matter - /// whether or not Prepare() succeeded. - void Close(); - - /// Initiate cancellation. If called concurrently with Prepare(), will wait for - /// Prepare() to finish in order to properly tear down Prepare()'d state. - /// - /// Cancel() may be called more than once. Calls after the first will have no - /// effect. Duplicate calls to Cancel() are not serialised, and may safely execute - /// concurrently. - /// - /// It is legal to call Cancel() if Prepare() returned an error. - void Cancel(); - - /// call these only after Prepare() - RuntimeState* runtime_state() { return runtime_state_.get(); } - - /// Profile information for plan and output sink. - RuntimeProfile* profile(); - - /// Blocks until Prepare() is completed. - Status WaitForPrepare() { return prepared_promise_.Get(); } - - /// Blocks until exec tree and sink are both opened. It is an error to call this before - /// Prepare() has completed. If Prepare() returned an error, WaitForOpen() will - /// return that error without blocking. - Status WaitForOpen(); - - /// Returns fragment instance's sink if this is the root fragment instance. Valid after - /// Prepare() returns; if Prepare() fails may be nullptr. - PlanRootSink* root_sink() { return root_sink_; } - - /// Name of the counter that is tracking per query, per host peak mem usage. - static const std::string PER_HOST_PEAK_MEM_COUNTER; - - private: - ExecNode* exec_tree_; // lives in runtime_state_->obj_pool() - TUniqueId query_id_; - - /// profile reporting-related - ReportStatusCallback report_status_cb_; - boost::scoped_ptr<Thread> report_thread_; - boost::mutex report_thread_lock_; - - /// Indicates that profile reporting thread should stop. - /// Tied to report_thread_lock_. - boost::condition_variable stop_report_thread_cv_; - - /// Indicates that profile reporting thread started. - /// Tied to report_thread_lock_. - boost::condition_variable report_thread_started_cv_; - - /// When the report thread starts, it sets 'report_thread_active_' to true and signals - /// 'report_thread_started_cv_'. The report thread is shut down by setting - /// 'report_thread_active_' to false and signalling 'stop_report_thread_cv_'. Protected - /// by 'report_thread_lock_'. - bool report_thread_active_; - - /// true if Close() has been called - bool closed_; - - /// true if this fragment has not returned the thread token to the thread resource mgr - bool has_thread_token_; - - /// 'runtime_state_' has to be before 'sink_' as 'sink_' relies on the object pool of - /// 'runtime_state_'. This means 'sink_' is destroyed first so any implicit connections - /// (e.g. mem_trackers_) from 'runtime_state_' to 'sink_' need to be severed prior to - /// the dtor of 'runtime_state_'. - boost::scoped_ptr<RuntimeState> runtime_state_; - - /// Profile for timings for each stage of the plan fragment instance's lifecycle. - RuntimeProfile* timings_profile_; - - /// Output sink for rows sent to this fragment. Created in Prepare(), owned by this - /// object. - boost::scoped_ptr<DataSink> sink_; - - /// Set if this fragment instance is the root of the entire plan, so that a consumer can - /// pull results by calling root_sink_->GetNext(). Same object as sink_. - PlanRootSink* root_sink_ = nullptr; - - boost::scoped_ptr<RowBatch> row_batch_; - - /// Protects is_prepared_ and is_cancelled_, and is also used to coordinate between - /// Prepare() and Cancel() to ensure mutual exclusion. - boost::mutex prepare_lock_; - - /// True if Prepare() has been called and done some work - even if it returned an - /// error. If Cancel() was called before Prepare(), is_prepared_ will not be set. - bool is_prepared_; - - /// Set when Prepare() returns. - Promise<Status> prepared_promise_; - - /// Set when OpenInternal() returns. - Promise<Status> opened_promise_; - - /// True if and only if Cancel() has been called. - bool is_cancelled_; - - /// A counter for the per query, per host peak mem usage. Note that this is not the - /// max of the peak memory of all fragments running on a host since it needs to take - /// into account when they are running concurrently. All fragments for a single query - /// on a single host will have the same value for this counter. - RuntimeProfile::Counter* per_host_mem_usage_; - - /// Number of rows returned by this fragment - RuntimeProfile::Counter* rows_produced_counter_; - - /// Average number of thread tokens for the duration of the plan fragment execution. - /// Fragments that do a lot of cpu work (non-coordinator fragment) will have at - /// least 1 token. Fragments that contain a hdfs scan node will have 1+ tokens - /// depending on system load. Other nodes (e.g. hash join node) can also reserve - /// additional tokens. - /// This is a measure of how much CPU resources this fragment used during the course - /// of the execution. - RuntimeProfile::Counter* average_thread_tokens_; - - /// Sampled memory usage at even time intervals. - RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_; - - /// Sampled thread usage (tokens) at even time intervals. - RuntimeProfile::TimeSeriesCounter* thread_usage_sampled_counter_; - - ObjectPool* obj_pool() { return runtime_state_->obj_pool(); } - - /// typedef for TPlanFragmentInstanceCtx.per_node_scan_ranges - typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges; - - /// Main loop of profile reporting thread. - /// Exits when notified on stop_report_thread_cv_ and report_thread_active_ is set to - /// false. This will not send the final report. - void ReportProfileThread(); - - /// Invoked the report callback. If 'done' is true, sends the final report with - /// 'status' and the profile. This type of report is sent once and only by the - /// instance execution thread. Otherwise, a profile-only report is sent, which the - /// ReportProfileThread() thread will do periodically. - void SendReport(bool done, const Status& status); - - /// Called when the fragment execution is complete to finalize counters and send - /// the final status report. Must be called only once. - void FragmentComplete(const Status& status); - - /// Optimizes the code-generated functions in runtime_state_->llvm_codegen(). - /// Must be called after exec_tree_->Prepare() and before exec_tree_->Open(). - /// Returns error if LLVM optimization or compilation fails. - Status OptimizeLlvmModule(); - - /// Executes Open() logic and returns resulting status. Does not set status_. - Status OpenInternal(); - - /// Pulls row batches from fragment instance and pushes them to sink_ in a loop. Returns - /// OK if the input was exhausted and sent to the sink successfully, an error otherwise. - /// If ExecInternal() returns without an error condition, all rows will have been sent - /// to the sink, the sink will have been closed, a final report will have been sent and - /// the report thread will have been stopped. - Status ExecInternal(); - - /// Performs all the logic of Prepare() and returns resulting status. - /// TODO: remove desc_tbl parameter as part of per-query exec rpc - Status PrepareInternal(QueryState* qs, const TDescriptorTable& desc_tbl, - const TPlanFragmentCtx& fragment_ctx, - const TPlanFragmentInstanceCtx& instance_ctx); - - /// Releases the thread token for this fragment executor. - void ReleaseThreadToken(); - - /// Stops report thread, if one is running. Blocks until report thread terminates. - /// Idempotent. - void StopReportThread(); - - /// Print stats about scan ranges for each volumeId in params to info log. - void PrintVolumeIds(const PerNodeScanRanges& per_node_scan_ranges); - - const DescriptorTbl& desc_tbl() { return runtime_state_->desc_tbl(); } -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-exec-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc index 8b72ed9..6057b52 100644 --- a/be/src/runtime/query-exec-mgr.cc +++ b/be/src/runtime/query-exec-mgr.cc @@ -41,48 +41,51 @@ using namespace impala; DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage " "every log_mem_usage_interval'th fragment completion."); -Status QueryExecMgr::StartFInstance(const TExecPlanFragmentParams& params) { - TUniqueId instance_id = params.fragment_instance_ctx.fragment_instance_id; - VLOG_QUERY << "StartFInstance() instance_id=" << PrintId(instance_id) +Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) { + TUniqueId query_id = params.query_ctx.query_id; + VLOG_QUERY << "StartQueryFInstances() query_id=" << PrintId(query_id) << " coord=" << params.query_ctx.coord_address; bool dummy; - QueryState* qs = GetOrCreateQueryState( - params.query_ctx, params.fragment_instance_ctx.request_pool, &dummy); - DCHECK(params.__isset.fragment_ctx); - DCHECK(params.__isset.fragment_instance_ctx); - Status status = qs->Prepare(); + QueryState* qs = GetOrCreateQueryState(params.query_ctx, &dummy); + Status status = qs->Init(params); if (!status.ok()) { ReleaseQueryState(qs); return status; } - - FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState( - qs, params.fragment_ctx, params.fragment_instance_ctx, params.query_ctx.desc_tbl)); - // register instance before returning so that async Cancel() calls can - // find the instance - qs->RegisterFInstance(fis); - // start new thread to execute instance + // avoid blocking the rpc handler thread for too long by starting a new thread for + // query startup (which takes ownership of the QueryState reference) Thread t("query-exec-mgr", - Substitute("exec-fragment-instance-$0", PrintId(instance_id)), - &QueryExecMgr::ExecFInstance, this, fis); + Substitute("start-query-finstances-$0", PrintId(query_id)), + &QueryExecMgr::StartQueryHelper, this, qs); t.Detach(); - - ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L); - ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L); return Status::OK(); } -QueryState* QueryExecMgr::CreateQueryState( - const TQueryCtx& query_ctx, const string& request_pool) { +QueryState* QueryExecMgr::CreateQueryState(const TQueryCtx& query_ctx) { bool created; - QueryState* qs = GetOrCreateQueryState(query_ctx, request_pool, &created); + QueryState* qs = GetOrCreateQueryState(query_ctx, &created); DCHECK(created); return qs; } +QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) { + QueryState* qs = nullptr; + int refcnt; + { + lock_guard<mutex> l(qs_map_lock_); + auto it = qs_map_.find(query_id); + if (it == qs_map_.end()) return nullptr; + qs = it->second; + refcnt = qs->refcnt_.Add(1); + } + DCHECK(qs != nullptr && refcnt > 0); + VLOG_QUERY << "QueryState: query_id=" << query_id << " refcnt=" << refcnt; + return qs; +} + QueryState* QueryExecMgr::GetOrCreateQueryState( - const TQueryCtx& query_ctx, const string& request_pool, bool* created) { + const TQueryCtx& query_ctx, bool* created) { QueryState* qs = nullptr; int refcnt; { @@ -90,30 +93,26 @@ QueryState* QueryExecMgr::GetOrCreateQueryState( auto it = qs_map_.find(query_ctx.query_id); if (it == qs_map_.end()) { // register new QueryState - qs = new QueryState(query_ctx, request_pool); + qs = new QueryState(query_ctx); qs_map_.insert(make_pair(query_ctx.query_id, qs)); - VLOG_QUERY << "new QueryState: query_id=" << query_ctx.query_id; *created = true; } else { qs = it->second; *created = false; } - // decremented at the end of ExecFInstance() + // decremented by ReleaseQueryState() refcnt = qs->refcnt_.Add(1); } - DCHECK(qs != nullptr && qs->refcnt_.Load() > 0); - VLOG_QUERY << "QueryState: query_id=" << query_ctx.query_id << " refcnt=" << refcnt; + DCHECK(qs != nullptr && refcnt > 0); return qs; } -void QueryExecMgr::ExecFInstance(FragmentInstanceState* fis) { - fis->Exec(); - ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L); - VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id()); +void QueryExecMgr::StartQueryHelper(QueryState* qs) { + qs->StartFInstances(); #ifndef ADDRESS_SANITIZER - // tcmalloc and address sanitizer can not be used together + // tcmalloc and address sanitizer cannot be used together if (FLAGS_log_mem_usage_interval > 0) { uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value(); if (num_complete % FLAGS_log_mem_usage_interval == 0) { @@ -125,19 +124,8 @@ void QueryExecMgr::ExecFInstance(FragmentInstanceState* fis) { } #endif - // decrement refcount taken in StartFInstance() - ReleaseQueryState(fis->query_state()); -} - -QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) { - VLOG_QUERY << "GetQueryState(): query_id=" << PrintId(query_id); - lock_guard<mutex> l(qs_map_lock_); - auto it = qs_map_.find(query_id); - if (it == qs_map_.end()) return nullptr; - QueryState* qs = it->second; - int32_t cnt = qs->refcnt_.Add(1); - DCHECK_GT(cnt, 0); - return qs; + // decrement refcount taken in StartQuery() + ReleaseQueryState(qs); } void QueryExecMgr::ReleaseQueryState(QueryState* qs) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-exec-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h index 92e756f..8a0c884 100644 --- a/be/src/runtime/query-exec-mgr.h +++ b/be/src/runtime/query-exec-mgr.h @@ -37,30 +37,24 @@ class FragmentInstanceState; /// A daemon-wide registry and manager of QueryStates. This is the central /// entry point for gaining refcounted access to a QueryState. It also initiates -/// fragment instance execution. +/// query execution. /// Thread-safe. -/// -/// TODO: as part of Impala-2550 (per-query exec rpc) -/// replace Start-/CancelFInstance() with StartQuery()/CancelQuery() class QueryExecMgr { public: - /// Initiates execution of this fragment instance in a newly created thread. - /// Also creates a QueryState for this query, if none exists. - /// In both cases it increases the refcount prior to instance execution and decreases - /// it after execution finishes. + /// Creates QueryState if it doesn't exist and initiates execution of all fragment + /// instance for this query. All fragment instances hold a reference to their + /// QueryState for the duration of their execution. /// - /// Returns an error if there was some unrecoverable problem before the fragment - /// was started (like low memory). In that case, no QueryState is created or has its - /// refcount incremented. After this call returns, it is legal to call - /// FragmentInstanceState::Cancel() on this fragment instance, regardless of the - /// return value of this function. - Status StartFInstance(const TExecPlanFragmentParams& params); + /// Returns an error if there was some unrecoverable problem before any instance + /// was started (like low memory). In that case, no QueryState is created. + /// After this function returns, it is legal to call QueryState::Cancel(), regardless of + /// the return value of this function. + Status StartQuery(const TExecQueryFInstancesParams& params); /// Creates a QueryState for the given query with the provided parameters. Only valid /// to call if the QueryState does not already exist. The caller must call /// ReleaseQueryState() with the returned QueryState to decrement the refcount. - QueryState* CreateQueryState( - const TQueryCtx& query_ctx, const std::string& request_pool); + QueryState* CreateQueryState(const TQueryCtx& query_ctx); /// If a QueryState for the given query exists, increments that refcount and returns /// the QueryState, otherwise returns nullptr. @@ -78,11 +72,11 @@ class QueryExecMgr { /// Gets the existing QueryState or creates a new one if not present. /// 'created' is set to true if it was created, false otherwise. - QueryState* GetOrCreateQueryState( - const TQueryCtx& query_ctx, const std::string& request_pool, bool* created); + /// Increments the refcount. + QueryState* GetOrCreateQueryState(const TQueryCtx& query_ctx, bool* created); - /// Execute instance. - void ExecFInstance(FragmentInstanceState* fis); + /// Execute instances and decrement refcount (acquire ownership of qs). + void StartQueryHelper(QueryState* qs); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 2cc4818..1ae0f36 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -21,20 +21,27 @@ #include <boost/thread/locks.hpp> #include <kudu/client/client.h> +#include "exprs/expr.h" #include "exec/kudu-util.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/reservation-tracker.h" +#include "runtime/backend-client.h" #include "runtime/exec-env.h" #include "runtime/fragment-instance-state.h" #include "runtime/mem-tracker.h" #include "runtime/query-exec-mgr.h" +#include "runtime/runtime-state.h" #include "util/debug-util.h" +#include "util/impalad-metrics.h" +#include "util/thread.h" #include "common/names.h" using boost::algorithm::join; using namespace impala; +#define RETRY_SLEEP_MS 100 + struct QueryState::KuduClientPtr { kudu::client::sp::shared_ptr<kudu::client::KuduClient> kudu_client; }; @@ -49,14 +56,17 @@ QueryState::ScopedRef::~ScopedRef() { ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_); } -QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool) +QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool) : query_ctx_(query_ctx), refcnt_(0), - prepared_(false), - released_resources_(false), - buffer_reservation_(nullptr), - file_group_(nullptr) { - TQueryOptions& query_options = query_ctx_.client_request.query_options; + is_cancelled_(0) { + if (query_ctx_.request_pool.empty()) { + // fix up pool name for tests + DCHECK(!request_pool.empty()); + const_cast<TQueryCtx&>(query_ctx_).request_pool = request_pool; + } + TQueryOptions& query_options = + const_cast<TQueryOptions&>(query_ctx_.client_request.query_options); // max_errors does not indicate how many errors in total have been recorded, but rather // how many are distinct. It is defined as the sum of the number of generic errors and // the number of distinct other errors. @@ -66,59 +76,56 @@ QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool) if (query_options.batch_size <= 0) { query_options.__set_batch_size(DEFAULT_BATCH_SIZE); } - InitMemTrackers(pool); + InitMemTrackers(); } void QueryState::ReleaseResources() { + DCHECK(!released_resources_); // Clean up temporary files. if (file_group_ != nullptr) file_group_->Close(); // Release any remaining reservation. if (buffer_reservation_ != nullptr) buffer_reservation_->Close(); // Avoid dangling reference from the parent of 'query_mem_tracker_'. if (query_mem_tracker_ != nullptr) query_mem_tracker_->UnregisterFromParent(); + if (desc_tbl_ != nullptr) desc_tbl_->ReleaseResources(); released_resources_ = true; } QueryState::~QueryState() { DCHECK(released_resources_); + DCHECK_EQ(refcnt_.Load(), 0); } -Status QueryState::Prepare() { - lock_guard<SpinLock> l(prepare_lock_); - if (prepared_) { - DCHECK(prepare_status_.ok()); - return Status::OK(); - } - RETURN_IF_ERROR(prepare_status_); - - Status status; +Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) { // Starting a new query creates threads and consumes a non-trivial amount of memory. // If we are already starved for memory, fail as early as possible to avoid consuming // more resources. ExecEnv* exec_env = ExecEnv::GetInstance(); MemTracker* process_mem_tracker = exec_env->process_mem_tracker(); if (process_mem_tracker->LimitExceeded()) { - string msg = Substitute("Query $0 could not start because the backend Impala daemon " - "is over its memory limit", - PrintId(query_id())); - status = process_mem_tracker->MemLimitExceeded(NULL, msg, 0); - goto error; + string msg = Substitute( + "Query $0 could not start because the backend Impala daemon " + "is over its memory limit", PrintId(query_id())); + RETURN_IF_ERROR(process_mem_tracker->MemLimitExceeded(NULL, msg, 0)); } // Do buffer-pool-related setup if running in a backend test that explicitly created // the pool. - if (exec_env->buffer_pool() != nullptr) { - status = InitBufferPoolState(); - if (!status.ok()) goto error; - } - prepared_ = true; - return Status::OK(); + if (exec_env->buffer_pool() != nullptr) RETURN_IF_ERROR(InitBufferPoolState()); -error: - prepare_status_ = status; - return status; + // don't copy query_ctx, it's large and we already did that in the c'tor + rpc_params_.__set_coord_state_idx(rpc_params.coord_state_idx); + TExecQueryFInstancesParams& non_const_params = + const_cast<TExecQueryFInstancesParams&>(rpc_params); + rpc_params_.fragment_ctxs.swap(non_const_params.fragment_ctxs); + rpc_params_.__isset.fragment_ctxs = true; + rpc_params_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs); + rpc_params_.__isset.fragment_instance_ctxs = true; + + return Status::OK(); } -void QueryState::InitMemTrackers(const std::string& pool) { +void QueryState::InitMemTrackers() { + const string& pool = query_ctx_.request_pool; int64_t bytes_limit = -1; if (query_options().__isset.mem_limit && query_options().mem_limit > 0) { bytes_limit = query_options().mem_limit; @@ -160,23 +167,194 @@ Status QueryState::InitBufferPoolState() { return Status::OK(); } -void QueryState::RegisterFInstance(FragmentInstanceState* fis) { - VLOG_QUERY << "RegisterFInstance(): instance_id=" << PrintId(fis->instance_id()); - lock_guard<SpinLock> l(fis_map_lock_); - DCHECK_EQ(fis_map_.count(fis->instance_id()), 0); - fis_map_.insert(make_pair(fis->instance_id(), fis)); -} - FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) { VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id); - lock_guard<SpinLock> l(fis_map_lock_); + if (!instances_prepared_promise_.Get().ok()) return nullptr; auto it = fis_map_.find(instance_id); return it != fis_map_.end() ? it->second : nullptr; } -Status QueryState::GetKuduClient(const std::vector<std::string>& master_addresses, - kudu::client::KuduClient** client) { - std::string master_addr_concat = join(master_addresses, ","); +void QueryState::ReportExecStatus(bool done, const Status& status, + FragmentInstanceState* fis) { + ReportExecStatusAux(done, status, fis, true); +} + +void QueryState::ReportExecStatusAux(bool done, const Status& status, + FragmentInstanceState* fis, bool instances_started) { + // if we're reporting an error, we're done + DCHECK(status.ok() || done); + // if this is not for a specific fragment instance, we're reporting an error + DCHECK(fis != nullptr || !status.ok()); + DCHECK(fis == nullptr || fis->IsPrepared()); + + // This will send a report even if we are cancelled. If the query completed correctly + // but fragments still need to be cancelled (e.g. limit reached), the coordinator will + // be waiting for a final report and profile. + + Status coord_status; + ImpalaBackendConnection coord(ExecEnv::GetInstance()->impalad_client_cache(), + query_ctx().coord_address, &coord_status); + if (!coord_status.ok()) { + // TODO: this might flood the log + LOG(WARNING) << "Couldn't get a client for " << query_ctx().coord_address + <<"\tReason: " << coord_status.GetDetail(); + if (instances_started) Cancel(); + return; + } + + TReportExecStatusParams params; + params.protocol_version = ImpalaInternalServiceVersion::V1; + params.__set_query_id(query_ctx().query_id); + DCHECK(rpc_params().__isset.coord_state_idx); + params.__set_coord_state_idx(rpc_params().coord_state_idx); + + if (fis != nullptr) { + // create status for 'fis' + params.instance_exec_status.emplace_back(); + params.__isset.instance_exec_status = true; + TFragmentInstanceExecStatus& instance_status = params.instance_exec_status.back(); + instance_status.__set_fragment_instance_id(fis->instance_id()); + status.SetTStatus(&instance_status); + instance_status.__set_done(done); + + if (fis->profile() != nullptr) { + fis->profile()->ToThrift(&instance_status.profile); + instance_status.__isset.profile = true; + } + + // Only send updates to insert status if fragment is finished, the coordinator + // waits until query execution is done to use them anyhow. + if (done) { + TInsertExecStatus insert_status; + if (fis->runtime_state()->hdfs_files_to_move()->size() > 0) { + insert_status.__set_files_to_move(*fis->runtime_state()->hdfs_files_to_move()); + } + if (fis->runtime_state()->per_partition_status()->size() > 0) { + insert_status.__set_per_partition_status( + *fis->runtime_state()->per_partition_status()); + } + params.__set_insert_exec_status(insert_status); + } + + // Send new errors to coordinator + fis->runtime_state()->GetUnreportedErrors(¶ms.error_log); + params.__isset.error_log = (params.error_log.size() > 0); + } + + TReportExecStatusResult res; + Status rpc_status; + bool retry_is_safe; + // Try to send the RPC 3 times before failing. + for (int i = 0; i < 3; ++i) { + rpc_status = coord.DoRpc( + &ImpalaBackendClient::ReportExecStatus, params, &res, &retry_is_safe); + if (rpc_status.ok()) break; + if (!retry_is_safe) break; + if (i < 2) SleepForMs(RETRY_SLEEP_MS); + } + Status result_status(res.status); + if ((!rpc_status.ok() || !result_status.ok()) && instances_started) { + // TODO: should we try to keep rpc_status for the final report? (but the final + // report, following this Cancel(), may not succeed anyway.) + // TODO: not keeping an error status here means that all instances might + // abort with CANCELLED status, despite there being an error + Cancel(); + } +} + +Status QueryState::WaitForPrepare() { + return instances_prepared_promise_.Get(); +} + +void QueryState::StartFInstances() { + VLOG_QUERY << "StartFInstances(): query_id=" << PrintId(query_id()) + << " #instances=" << rpc_params_.fragment_instance_ctxs.size(); + DCHECK_GT(refcnt_.Load(), 0); + + // set up desc tbl + DCHECK(query_ctx().__isset.desc_tbl); + Status status = DescriptorTbl::Create( + &obj_pool_, query_ctx().desc_tbl, query_mem_tracker_, &desc_tbl_); + if (!status.ok()) { + instances_prepared_promise_.Set(status); + ReportExecStatusAux(true, status, nullptr, false); + return; + } + VLOG_QUERY << "descriptor table for query=" << PrintId(query_id()) + << "\n" << desc_tbl_->DebugString(); + + DCHECK_GT(rpc_params_.fragment_ctxs.size(), 0); + TPlanFragmentCtx* fragment_ctx = &rpc_params_.fragment_ctxs[0]; + int fragment_ctx_idx = 0; + for (const TPlanFragmentInstanceCtx& instance_ctx: rpc_params_.fragment_instance_ctxs) { + // determine corresponding TPlanFragmentCtx + if (fragment_ctx->fragment.idx != instance_ctx.fragment_idx) { + ++fragment_ctx_idx; + DCHECK_LT(fragment_ctx_idx, rpc_params_.fragment_ctxs.size()); + fragment_ctx = &rpc_params_.fragment_ctxs[fragment_ctx_idx]; + // we expect fragment and instance contexts to follow the same order + DCHECK_EQ(fragment_ctx->fragment.idx, instance_ctx.fragment_idx); + } + FragmentInstanceState* fis = obj_pool_.Add( + new FragmentInstanceState(this, *fragment_ctx, instance_ctx)); + fis_map_.emplace(fis->instance_id(), fis); + + // update fragment_map_ + vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx]; + fis_list.push_back(fis); + + // start new thread to execute instance + refcnt_.Add(1); // decremented in ExecFInstance() + Thread t("query-state", + Substitute( + "exec-query-finstance-$0", PrintId(instance_ctx.fragment_instance_id)), + &QueryState::ExecFInstance, this, fis); + t.Detach(); + } + + // don't return until every instance is prepared and record the first non-OK + // (non-CANCELLED if available) status + Status prepare_status; + for (auto entry: fis_map_) { + Status instance_status = entry.second->WaitForPrepare(); + // don't wipe out an error in one instance with the resulting CANCELLED from + // the remaining instances + if (!instance_status.ok() && (prepare_status.ok() || prepare_status.IsCancelled())) { + prepare_status = instance_status; + } + } + instances_prepared_promise_.Set(prepare_status); +} + +void QueryState::ExecFInstance(FragmentInstanceState* fis) { + ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L); + ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L); + VLOG_QUERY << "Executing instance. instance_id=" << PrintId(fis->instance_id()) + << " fragment_idx=" << fis->instance_ctx().fragment_idx + << " per_fragment_instance_idx=" << fis->instance_ctx().per_fragment_instance_idx + << " coord_state_idx=" << rpc_params().coord_state_idx + << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value(); + Status status = fis->Exec(); + ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L); + VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id()) + << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value() + << " status=" << status; + // initiate cancellation if nobody has done so yet + if (!status.ok()) Cancel(); + // decrement refcount taken in StartFInstances() + ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this); +} + +void QueryState::Cancel() { + VLOG_QUERY << "Cancel: query_id=" << query_id(); + (void) instances_prepared_promise_.Get(); + if (!is_cancelled_.CompareAndSwap(0, 1)) return; + for (auto entry: fis_map_) entry.second->Cancel(); +} + +Status QueryState::GetKuduClient( + const vector<string>& master_addresses, kudu::client::KuduClient** client) { + string master_addr_concat = join(master_addresses, ","); lock_guard<SpinLock> l(kudu_client_map_lock_); auto kudu_client_map_it = kudu_client_map_.find(master_addr_concat); if (kudu_client_map_it == kudu_client_map_.end()) { @@ -191,3 +369,12 @@ Status QueryState::GetKuduClient(const std::vector<std::string>& master_addresse } return Status::OK(); } + +void QueryState::PublishFilter(int32_t filter_id, int fragment_idx, + const TBloomFilter& thrift_bloom_filter) { + if (!instances_prepared_promise_.Get().ok()) return; + DCHECK_EQ(fragment_map_.count(fragment_idx), 1); + for (FragmentInstanceState* fis: fragment_map_[fragment_idx]) { + fis->PublishFilter(filter_id, thrift_bloom_filter); + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h index d7ce10f..5660a49 100644 --- a/be/src/runtime/query-state.h +++ b/be/src/runtime/query-state.h @@ -29,6 +29,7 @@ #include "runtime/tmp-file-mgr.h" #include "util/spinlock.h" #include "util/uid-util.h" +#include "util/promise.h" namespace kudu { namespace client { class KuduClient; } } @@ -44,17 +45,35 @@ class ReservationTracker; /// instances; in contrast, fragment instance-specific state is collected in /// FragmentInstanceState. /// -/// The lifetime of an instance of this class is dictated by a reference count. -/// Any thread that executes on behalf of a query, and accesses any of its state, -/// must obtain a reference to the corresponding QueryState and hold it for at least the +/// The lifetime of a QueryState is dictated by a reference count. Any thread that +/// executes on behalf of a query, and accesses any of its state, must obtain a +/// reference to the corresponding QueryState and hold it for at least the /// duration of that access. The reference is obtained and released via /// QueryExecMgr::Get-/ReleaseQueryState() or via QueryState::ScopedRef (the latter /// for references limited to the scope of a single function or block). -/// As long as the reference count is greater than 0, all query state (contained -/// either in this class or accessible through this class, such as the -/// FragmentInstanceStates) is guaranteed to be alive. +/// As long as the reference count is greater than 0, all of a query's control +/// structures (contained either in this class or accessible through this class, such +/// as the FragmentInstanceStates) are guaranteed to be alive. +/// +/// When any fragment instance execution returns with an error status, all +/// fragment instances are automatically cancelled. +/// +/// Status reporting: all instances currently report their status independently. +/// Each instance sends at least one final status report with its overall execution +/// status, so if any of the instances encountered an error, that error will be reported. /// /// Thread-safe, unless noted otherwise. +/// +/// TODO: +/// - set up kudu clients in Init(), remove related locking +/// - release resources (those referenced directly or indirectly by the query result +/// set) automatically when all instances have finished execution +/// (either by returning all rows or by being cancelled), rather than waiting for an +/// explicit call to ReleaseResources() +/// - when ReportExecStatus() encounters an error, query execution at this node +/// gets aborted, but it's possible for the coordinator not to find out about that; +/// fix the coordinator to periodically ping the backends (should the coordinator +/// simply poll for the status reports?) class QueryState { public: /// Use this class to obtain a QueryState for the duration of a function/block, @@ -84,89 +103,126 @@ class QueryState { /// a shared pool for all objects that have query lifetime ObjectPool* obj_pool() { return &obj_pool_; } - /// This TQueryCtx was copied from the first fragment instance which led to the - /// creation of this QueryState. For all subsequently arriving fragment instances the - /// desc_tbl in this context will be incorrect, therefore query_ctx().desc_tbl should - /// not be used. This restriction will go away with the switch to a per-query exec - /// rpc. const TQueryCtx& query_ctx() const { return query_ctx_; } - - const TUniqueId& query_id() const { return query_ctx_.query_id; } - + const TUniqueId& query_id() const { return query_ctx().query_id; } const TQueryOptions& query_options() const { return query_ctx_.client_request.query_options; } - MemTracker* query_mem_tracker() const { return query_mem_tracker_; } + + // the following getters are only valid after Prepare() ReservationTracker* buffer_reservation() const { return buffer_reservation_; } TmpFileMgr::FileGroup* file_group() const { return file_group_; } + const TExecQueryFInstancesParams& rpc_params() const { return rpc_params_; } + + // the following getters are only valid after StartFInstances() + const DescriptorTbl& desc_tbl() const { return *desc_tbl_; } /// Sets up state required for fragment execution: memory reservations, etc. Fails - /// if resources could not be acquired. Safe to call concurrently and idempotent: - /// the first thread to call this does the setup work. - Status Prepare(); + /// if resources could not be acquired. Uses few cycles and never blocks. + /// Not idempotent, not thread-safe. + /// The remaining public functions must be called only after Init(). + Status Init(const TExecQueryFInstancesParams& rpc_params) WARN_UNUSED_RESULT; + + /// Performs the runtime-intensive parts of initial setup and starts all fragment + /// instances belonging to this query. Each instance receives its own execution + /// thread. Blocks until all fragment instances have finished their Prepare phase. + /// Not idempotent, not thread-safe. + void StartFInstances(); + + /// Return overall status of Prepare phases of fragment instances. A failure + /// in any instance's Prepare will cause this function to return an error status. + /// Blocks until all fragment instances have finished their Prepare phase. + Status WaitForPrepare(); + + /// Blocks until all fragment instances have finished their Prepare phase. + FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id); - /// Registers a new FInstanceState. - void RegisterFInstance(FragmentInstanceState* fis); + /// Blocks until all fragment instances have finished their Prepare phase. + void PublishFilter(int32_t filter_id, int fragment_idx, + const TBloomFilter& thrift_bloom_filter); - /// Returns the instance state or nullptr if the instance id has not previously - /// been registered. The returned FIS is valid for the duration of the QueryState. - FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id); + /// Cancels all actively executing fragment instances. Blocks until all fragment + /// instances have finished their Prepare phase. Idempotent. + void Cancel(); /// Called once the query is complete to release any resources. - /// Must be called before destroying the QueryState. + /// Must be called only once and before destroying the QueryState. + /// Not idempotent, not thread-safe. void ReleaseResources(); /// Gets a KuduClient for this list of master addresses. It will lookup and share /// an existing KuduClient if possible. Otherwise, it will create a new KuduClient /// internally and return a pointer to it. All KuduClients accessed through this /// interface are owned by the QueryState. Thread safe. - Status GetKuduClient(const std::vector<std::string>& master_addrs, - kudu::client::KuduClient** client); + Status GetKuduClient( + const std::vector<std::string>& master_addrs, kudu::client::KuduClient** client) + WARN_UNUSED_RESULT; + + /// Sends a ReportExecStatus rpc to the coordinator. If fis == nullptr, the + /// status must be an error. If fis is given, expects that fis finished its Prepare + /// phase; it then sends a report for that instance, including its profile. + /// If there is an error during the rpc, initiates cancellation. + void ReportExecStatus(bool done, const Status& status, FragmentInstanceState* fis); ~QueryState(); private: friend class QueryExecMgr; + /// test execution + friend class RuntimeState; + static const int DEFAULT_BATCH_SIZE = 1024; - TQueryCtx query_ctx_; + /// set in c'tor + const TQueryCtx query_ctx_; - ObjectPool obj_pool_; - AtomicInt32 refcnt_; + /// the top-level MemTracker for this query (owned by obj_pool_), created in c'tor + MemTracker* query_mem_tracker_ = nullptr; - /// Held for duration of Prepare(). Protects 'prepared_', - /// 'prepare_status_' and the members initialized in Prepare(). - SpinLock prepare_lock_; + /// set in Prepare(); rpc_params_.query_ctx is *not* set to avoid duplication + /// with query_ctx_ + /// TODO: find a way not to have to copy this + TExecQueryFInstancesParams rpc_params_; - /// Non-OK if Prepare() failed the first time it was called. - /// All subsequent calls to Prepare() return this status. - Status prepare_status_; + /// Buffer reservation for this query (owned by obj_pool_) + /// Only non-null in backend tests that explicitly enabled the new buffer pool + /// Set in Prepare(). + /// TODO: this will always be non-null once IMPALA-3200 is done + ReservationTracker* buffer_reservation_ = nullptr; - /// True if Prepare() executed and finished successfully. - bool prepared_; + /// Temporary files for this query (owned by obj_pool_) + /// Only non-null in backend tests the explicitly enabled the new buffer pool + /// Set in Prepare(). + /// TODO: this will always be non-null once IMPALA-3200 is done + TmpFileMgr::FileGroup* file_group_ = nullptr; - /// True if and only if ReleaseResources() has been called. - bool released_resources_; + /// created in StartFInstances(), owned by obj_pool_ + DescriptorTbl* desc_tbl_ = nullptr; - SpinLock fis_map_lock_; // protects fis_map_ + /// Barrier for the completion of the Prepare phases of all fragment instances, + /// set in StartFInstances(). + Promise<Status> instances_prepared_promise_; - /// map from instance id to its state (owned by obj_pool_) + /// map from instance id to its state (owned by obj_pool_), populated in + /// StartFInstances(); not valid to read from until instances_prepare_promise_ + /// is set std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_; - /// The top-level MemTracker for this query (owned by obj_pool_). - MemTracker* query_mem_tracker_; + /// map from fragment index to its instances (owned by obj_pool_), populated in + /// StartFInstances() + std::unordered_map<int, std::vector<FragmentInstanceState*>> fragment_map_; - /// Buffer reservation for this query (owned by obj_pool_) - /// Only non-null in backend tests that explicitly enabled the new buffer pool - /// TODO: this will always be non-null once IMPALA-3200 is done - ReservationTracker* buffer_reservation_; + ObjectPool obj_pool_; + AtomicInt32 refcnt_; - /// Temporary files for this query (owned by obj_pool_) - /// Only non-null in backend tests the explicitly enabled the new buffer pool - /// TODO: this will always be non-null once IMPALA-3200 is done - TmpFileMgr::FileGroup* file_group_; + /// set to 1 when any fragment instance fails or when Cancel() is called; used to + /// initiate cancellation exactly once + AtomicInt32 is_cancelled_; + + /// True if and only if ReleaseResources() has been called. + bool released_resources_ = false; SpinLock kudu_client_map_lock_; // protects kudu_client_map_ @@ -184,16 +240,25 @@ class QueryState { /// that the master address lists be identical in order to share a KuduClient. KuduClientMap kudu_client_map_; - /// Create QueryState w/ copy of query_ctx and refcnt of 0. - /// The query is associated with the resource pool named 'pool' - QueryState(const TQueryCtx& query_ctx, const std::string& pool); + /// Create QueryState w/ refcnt of 0. + /// The query is associated with the resource pool query_ctx.request_pool or + /// 'request_pool', if the former is not set (needed for tests). + QueryState(const TQueryCtx& query_ctx, const std::string& request_pool = ""); + + /// Execute the fragment instance and decrement the refcnt when done. + void ExecFInstance(FragmentInstanceState* fis); /// Called from Prepare() to initialize MemTrackers. - void InitMemTrackers(const std::string& pool); + void InitMemTrackers(); - /// Called from PrepareForExecution() to setup buffer reservations and the + /// Called from Prepare() to setup buffer reservations and the /// file group. Fails if required resources are not available. - Status InitBufferPoolState(); + Status InitBufferPoolState() WARN_UNUSED_RESULT; + + /// Same behavior as ReportExecStatus(). + /// Cancel on error only if instances_started is true. + void ReportExecStatusAux(bool done, const Status& status, FragmentInstanceState* fis, + bool instances_started); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/runtime-filter-bank.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc index 3e2dc6d..2a47cac 100644 --- a/be/src/runtime/runtime-filter-bank.cc +++ b/be/src/runtime/runtime-filter-bank.cc @@ -151,9 +151,10 @@ void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id, if (has_remote_target && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) { + params.__set_filter_id(filter_id); + params.__set_query_id(state_->query_id()); BloomFilter::ToThrift(bloom_filter, ¶ms.bloom_filter); - params.filter_id = filter_id; - params.query_id = state_->query_id(); + params.__isset.bloom_filter = true; ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>( SendFilterToCoordinator, state_->query_ctx().coord_address, params, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 62d0737..c5e2f59 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -74,16 +74,13 @@ namespace impala { RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env) - : desc_tbl_(nullptr), - obj_pool_(new ObjectPool()), - query_state_(query_state), + : query_state_(query_state), fragment_ctx_(&fragment_ctx), instance_ctx_(&instance_ctx), now_(new TimestampValue(query_state->query_ctx().now_string.c_str(), query_state->query_ctx().now_string.size())), exec_env_(exec_env), - profile_(obj_pool_.get(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)), - query_mem_tracker_(query_state_->query_mem_tracker()), + profile_(obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)), instance_buffer_reservation_(nullptr), is_cancelled_(false), root_node_id_(-1) { @@ -91,20 +88,21 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag } RuntimeState::RuntimeState( - const TQueryCtx& query_ctx, ExecEnv* exec_env, const std::string& request_pool) - : obj_pool_(new ObjectPool()), - query_state_(nullptr), + const TQueryCtx& qctx, ExecEnv* exec_env, DescriptorTbl* desc_tbl) + : query_state_(new QueryState(qctx, "test-pool")), fragment_ctx_(nullptr), instance_ctx_(nullptr), - local_query_ctx_(query_ctx), - now_(new TimestampValue(query_ctx.now_string.c_str(), query_ctx.now_string.size())), + local_query_state_(query_state_), + now_(new TimestampValue(qctx.now_string.c_str(), qctx.now_string.size())), exec_env_(exec_env), - profile_(obj_pool_.get(), "<unnamed>"), - query_mem_tracker_(MemTracker::CreateQueryMemTracker( - query_id(), query_options(), request_pool, obj_pool_.get())), + profile_(obj_pool(), "<unnamed>"), instance_buffer_reservation_(nullptr), is_cancelled_(false), root_node_id_(-1) { + if (query_ctx().request_pool.empty()) { + const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool"; + } + if (desc_tbl != nullptr) query_state_->desc_tbl_ = desc_tbl; Init(); } @@ -125,10 +123,10 @@ void RuntimeState::Init() { total_network_receive_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkReceiveTime"); instance_mem_tracker_.reset(new MemTracker( - runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_)); + runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker())); if (query_state_ != nullptr && exec_env_->buffer_pool() != nullptr) { - instance_buffer_reservation_ = obj_pool_->Add(new ReservationTracker); + instance_buffer_reservation_ = obj_pool()->Add(new ReservationTracker); instance_buffer_reservation_->InitChildTracker(&profile_, query_state_->buffer_reservation(), instance_mem_tracker_.get(), numeric_limits<int64_t>::max()); @@ -143,7 +141,7 @@ Status RuntimeState::CreateBlockMgr() { DCHECK(block_mgr_.get() == NULL); // Compute the max memory the block mgr will use. - int64_t block_mgr_limit = query_mem_tracker_->lowest_limit(); + int64_t block_mgr_limit = query_mem_tracker()->lowest_limit(); if (block_mgr_limit < 0) block_mgr_limit = numeric_limits<int64_t>::max(); block_mgr_limit = min(static_cast<int64_t>(block_mgr_limit * BLOCK_MGR_MEM_FRACTION), block_mgr_limit - BLOCK_MGR_MEM_MIN_REMAINING); @@ -266,7 +264,6 @@ void RuntimeState::UnregisterReaderContexts() { void RuntimeState::ReleaseResources() { UnregisterReaderContexts(); - if (desc_tbl_ != nullptr) desc_tbl_->ClosePartitionExprs(this); if (filter_bank_ != nullptr) filter_bank_->Close(); if (resource_pool_ != nullptr) { exec_env_->thread_mgr()->UnregisterPool(resource_pool_); @@ -277,17 +274,22 @@ void RuntimeState::ReleaseResources() { // Release the reservation, which should be unused at the point. if (instance_buffer_reservation_ != nullptr) instance_buffer_reservation_->Close(); - // 'query_mem_tracker_' must be valid as long as 'instance_mem_tracker_' is so + // 'query_mem_tracker()' must be valid as long as 'instance_mem_tracker_' is so // delete 'instance_mem_tracker_' first. // LogUsage() walks the MemTracker tree top-down when the memory limit is exceeded, so // break the link between 'instance_mem_tracker_' and its parent before // 'instance_mem_tracker_' and its children are destroyed. instance_mem_tracker_->UnregisterFromParent(); + if (instance_mem_tracker_->consumption() != 0) { + LOG(WARNING) << "Query " << query_id() << " may have leaked memory." << endl + << instance_mem_tracker_->LogUsage(); + } instance_mem_tracker_.reset(); - // If this RuntimeState owns 'query_mem_tracker_' it must deregister it. - if (query_state_ == nullptr) query_mem_tracker_->UnregisterFromParent(); - query_mem_tracker_ = nullptr; + if (local_query_state_.get() != nullptr) { + // if we created this QueryState, we must call ReleaseResources() + local_query_state_->ReleaseResources(); + } } const std::string& RuntimeState::GetEffectiveUser() const { @@ -314,14 +316,28 @@ HBaseTableFactory* RuntimeState::htable_factory() { return exec_env_->htable_factory(); } +ObjectPool* RuntimeState::obj_pool() const { + DCHECK(query_state_ != nullptr); + return query_state_->obj_pool(); +} + const TQueryCtx& RuntimeState::query_ctx() const { - return query_state_ != nullptr ? query_state_->query_ctx() : local_query_ctx_; + DCHECK(query_state_ != nullptr); + return query_state_->query_ctx(); +} + +const DescriptorTbl& RuntimeState::desc_tbl() const { + DCHECK(query_state_ != nullptr); + return query_state_->desc_tbl(); } const TQueryOptions& RuntimeState::query_options() const { - const TQueryCtx& query_ctx = - query_state_ != nullptr ? query_state_->query_ctx() : local_query_ctx_; - return query_ctx.client_request.query_options; + return query_ctx().client_request.query_options; +} + +MemTracker* RuntimeState::query_mem_tracker() { + DCHECK(query_state_ != nullptr); + return query_state_->query_mem_tracker(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index db9948f..d70459f 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -81,9 +81,10 @@ class RuntimeState { RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env); - /// RuntimeState for executing expr in fe-support. + /// RuntimeState for test execution and fe-support.cc. Creates its own QueryState and + /// installs desc_tbl, if set. If query_ctx.request_pool isn't set, sets it to "test-pool". RuntimeState( - const TQueryCtx& query_ctx, ExecEnv* exec_env, const std::string& request_pool); + const TQueryCtx& query_ctx, ExecEnv* exec_env, DescriptorTbl* desc_tbl = nullptr); /// Empty d'tor to avoid issues with scoped_ptr. ~RuntimeState(); @@ -95,9 +96,9 @@ class RuntimeState { Status CreateBlockMgr(); QueryState* query_state() const { return query_state_; } - ObjectPool* obj_pool() const { return obj_pool_.get(); } - const DescriptorTbl& desc_tbl() const { return *desc_tbl_; } - void set_desc_tbl(DescriptorTbl* desc_tbl) { desc_tbl_ = desc_tbl; } + /// Return the query's ObjectPool + ObjectPool* obj_pool() const; + const DescriptorTbl& desc_tbl() const; const TQueryOptions& query_options() const; int batch_size() const { return query_options().batch_size; } bool abort_on_error() const { return query_options().abort_on_error; } @@ -128,7 +129,7 @@ class RuntimeState { CatalogServiceClientCache* catalogd_client_cache(); DiskIoMgr* io_mgr(); MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); } - MemTracker* query_mem_tracker() { return query_mem_tracker_; } + MemTracker* query_mem_tracker(); // reference to the query_state_'s memtracker ReservationTracker* instance_buffer_reservation() { return instance_buffer_reservation_; } @@ -253,7 +254,7 @@ class RuntimeState { Status LogOrReturnError(const ErrorMsg& message); bool is_cancelled() const { return is_cancelled_; } - void set_is_cancelled(bool v) { is_cancelled_ = v; } + void set_is_cancelled() { is_cancelled_ = true; } RuntimeProfile::Counter* total_storage_wait_timer() { return total_storage_wait_timer_; @@ -320,9 +321,6 @@ class RuntimeState { block_mgr_ = block_mgr; } - DescriptorTbl* desc_tbl_ = nullptr; - boost::scoped_ptr<ObjectPool> obj_pool_; - /// Lock protecting error_log_ SpinLock error_log_lock_; @@ -330,13 +328,12 @@ class RuntimeState { ErrorLogMap error_log_; /// Global QueryState and original thrift descriptors for this fragment instance. - /// Not set by the (const TQueryCtx&) c'tor. QueryState* const query_state_; const TPlanFragmentCtx* const fragment_ctx_; const TPlanFragmentInstanceCtx* const instance_ctx_; - /// Provides query ctx if query_state_ == nullptr. - TQueryCtx local_query_ctx_; + /// only populated by the (const QueryCtx&, ExecEnv*, DescriptorTbl*) c'tor + boost::scoped_ptr<QueryState> local_query_state_; /// Provides instance id if instance_ctx_ == nullptr TUniqueId no_instance_id_; @@ -377,16 +374,12 @@ class RuntimeState { /// Total CPU utilization for all threads in this plan fragment. RuntimeProfile::ThreadCounters* total_thread_statistics_; - /// Reference to the query MemTracker, owned by 'query_state_' if that is non-NULL - /// or stored in 'obj_pool_' otherwise. - MemTracker* query_mem_tracker_; - /// Memory usage of this fragment instance, a child of 'query_mem_tracker_'. boost::scoped_ptr<MemTracker> instance_mem_tracker_; /// Buffer reservation for this fragment instance - a child of the query buffer /// reservation. Non-NULL if 'query_state_' is not NULL and ExecEnv::buffer_pool_ - /// was created by a backend test. Owned by 'obj_pool_'. + /// was created by a backend test. Owned by obj_pool(). ReservationTracker* instance_buffer_reservation_; /// if true, execution should stop with a CANCELLED status http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/test-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index 026b2ee..37b4363 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -18,17 +18,15 @@ #include "runtime/test-env.h" #include <limits> +#include <memory> #include "runtime/buffered-block-mgr.h" #include "runtime/query-exec-mgr.h" #include "runtime/tmp-file-mgr.h" +#include "runtime/query-state.h" #include "util/disk-info.h" #include "util/impalad-metrics.h" - #include "gutil/strings/substitute.h" - -#include <memory> - #include "common/names.h" using boost::scoped_ptr; @@ -40,8 +38,8 @@ scoped_ptr<MetricGroup> TestEnv::static_metrics_; TestEnv::TestEnv() : have_tmp_file_mgr_args_(false), - buffer_pool_min_buffer_len_(1024), - buffer_pool_capacity_(0) {} + buffer_pool_min_buffer_len_(-1), + buffer_pool_capacity_(-1) {} Status TestEnv::Init() { if (static_metrics_ == NULL) { @@ -61,7 +59,9 @@ Status TestEnv::Init() { } else { RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics())); } - exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_); + if (buffer_pool_min_buffer_len_ != -1 && buffer_pool_capacity_ != -1) { + exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_); + } return Status::OK(); } @@ -80,7 +80,7 @@ void TestEnv::SetBufferPoolArgs(int64_t min_buffer_len, int64_t capacity) { TestEnv::~TestEnv() { // Queries must be torn down first since they are dependent on global state. TearDownQueries(); - exec_env_->disk_io_mgr_.reset(); + // tear down exec env state to avoid leaks exec_env_.reset(); } @@ -113,13 +113,23 @@ Status TestEnv::CreateQueryState( if (query_options != nullptr) query_ctx.client_request.query_options = *query_options; query_ctx.query_id.hi = 0; query_ctx.query_id.lo = query_id; + query_ctx.request_pool = "test-pool"; // CreateQueryState() enforces the invariant that 'query_id' must be unique. - QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx, "test-pool"); + QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx); query_states_.push_back(qs); - RETURN_IF_ERROR(qs->Prepare()); - FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState( - qs, TPlanFragmentCtx(), TPlanFragmentInstanceCtx(), TDescriptorTable())); + // make sure to initialize data structures unrelated to the TExecQueryFInstancesParams + // param + TExecQueryFInstancesParams rpc_params; + // create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState + rpc_params.__set_coord_state_idx(0); + rpc_params.__set_query_ctx(TQueryCtx()); + rpc_params.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()})); + rpc_params.__set_fragment_instance_ctxs( + vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()})); + RETURN_IF_ERROR(qs->Init(rpc_params)); + FragmentInstanceState* fis = qs->obj_pool()->Add( + new FragmentInstanceState(qs, qs->rpc_params().fragment_ctxs[0], qs->rpc_params().fragment_instance_ctxs[0])); RuntimeState* rs = qs->obj_pool()->Add( new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get())); runtime_states_.push_back(rs); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/test-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h index 30e9309..f314452 100644 --- a/be/src/runtime/test-env.h +++ b/be/src/runtime/test-env.h @@ -22,11 +22,12 @@ #include "runtime/exec-env.h" #include "runtime/fragment-instance-state.h" #include "runtime/mem-tracker.h" -#include "runtime/query-state.h" #include "runtime/runtime-state.h" namespace impala { +class QueryState; + /// Helper testing class that creates an environment with runtime memory management /// similar to the one used by the Impala runtime. Only one TestEnv can be active at a /// time, because it modifies the global ExecEnv singleton. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/scheduling/query-schedule.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc index bb44145..a035c7d 100644 --- a/be/src/scheduling/query-schedule.cc +++ b/be/src/scheduling/query-schedule.cc @@ -220,14 +220,6 @@ const TPlanFragment& FInstanceExecParams::fragment() const { return fragment_exec_params.fragment; } -int QuerySchedule::GetNumFragmentInstances() const { - int result = 0; - for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) { - result += fragment_exec_params.instance_exec_params.size(); - } - return result; -} - const TPlanFragment* QuerySchedule::GetCoordFragment() const { // Only have coordinator fragment for statements that return rows. if (request_.stmt_type != TStmtType::QUERY) return nullptr; @@ -262,4 +254,12 @@ vector<int> FragmentExecParams::GetInstanceIdxs() const { return result; } +int QuerySchedule::GetNumFragmentInstances() const { + int total = 0; + for (const FragmentExecParams& p: fragment_exec_params_) { + total += p.instance_exec_params.size(); + } + return total; +} + } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/scheduling/query-schedule.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h index 47848e8..8a985c7 100644 --- a/be/src/scheduling/query-schedule.h +++ b/be/src/scheduling/query-schedule.h @@ -132,9 +132,6 @@ class QuerySchedule { /// Helper methods used by scheduler to populate this QuerySchedule. void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; } - /// Returns the total number of fragment instances. - int GetNumFragmentInstances() const; - /// Return the coordinator fragment, or nullptr if there isn't one. const TPlanFragment* GetCoordFragment() const; @@ -148,6 +145,9 @@ class QuerySchedule { return plan_node_to_fragment_idx_[id]; } + /// Return the total number of instances across all fragments. + int GetNumFragmentInstances() const; + /// Returns next instance id. Instance ids are consecutive numbers generated from /// the query id. /// If the query contains a coordinator fragment instance, the generated instance @@ -213,7 +213,7 @@ class QuerySchedule { // (TPlanFragment.idx) std::vector<FragmentExecParams> fragment_exec_params_; - /// The set of hosts that the query will run on excluding the coordinator. + /// The set of hosts that the query will run on including the coordinator. boost::unordered_set<TNetworkAddress> unique_hosts_; /// Total number of scan ranges of this query.
