http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/coordinator.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h index 20a2af5..bc635b1 100644 --- a/be/src/runtime/coordinator.h +++ b/be/src/runtime/coordinator.h @@ -38,7 +38,6 @@ #include "common/status.h" #include "gen-cpp/Frontend_types.h" #include "gen-cpp/Types_types.h" -#include "runtime/query-state.h" #include "runtime/runtime-state.h" // for PartitionStatusMap; TODO: disentangle #include "scheduling/query-schedule.h" #include "util/histogram-metric.h" @@ -52,7 +51,6 @@ class DataStreamMgr; class DataSink; class RowBatch; class RowDescriptor; -class PlanFragmentExecutor; class ObjectPool; class RuntimeState; class Expr; @@ -71,8 +69,8 @@ class QueryResultSet; class MemTracker; class PlanRootSink; class FragmentInstanceState; +class QueryState; -struct DebugOptions; /// Query coordinator: handles execution of fragment instances on remote nodes, given a /// TQueryExecRequest. As part of that, it handles all interactions with the executing @@ -94,31 +92,36 @@ struct DebugOptions; /// A typical sequence of calls for a single query (calls under the same numbered /// item can happen concurrently): /// 1. client: Exec() -/// 2. client: Wait()/client: Cancel()/backend: UpdateFragmentExecStatus() -/// 3. client: GetNext()*/client: Cancel()/backend: UpdateFragmentExecStatus() +/// 2. client: Wait()/client: Cancel()/backend: UpdateBackendExecStatus() +/// 3. client: GetNext()*/client: Cancel()/backend: UpdateBackendExecStatus() /// /// The implementation ensures that setting an overall error status and initiating -/// cancellation of local and all remote fragments is atomic. +/// cancellation of all fragment instances is atomic. /// +/// TODO: remove TearDown() and replace with ReleaseResources(); TearDown() currently +/// also disassembles the control structures (such as the local reference to the +/// coordinator's FragmentInstanceState) /// TODO: move into separate subdirectory and move nested classes into separate files /// and unnest them +/// TODO: clean up locking behavior; in particular, clarify dependency on lock_ +/// TODO: clarify cancellation path; in particular, cancel as soon as we return +/// all results class Coordinator { // NOLINT: The member variables could be re-ordered to save space public: - Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env, - RuntimeProfile::EventSequence* events); + Coordinator(const QuerySchedule& schedule, RuntimeProfile::EventSequence* events); ~Coordinator(); /// Initiate asynchronous execution of a query with the given schedule. When it returns, /// all fragment instances have started executing at their respective backends. /// A call to Exec() must precede all other member function calls. - Status Exec(); + Status Exec() WARN_UNUSED_RESULT; /// Blocks until result rows are ready to be retrieved via GetNext(), or, if the /// query doesn't return rows, until the query finishes or is cancelled. /// A call to Wait() must precede all calls to GetNext(). /// Multiple calls to Wait() are idempotent and it is okay to issue multiple /// Wait() calls concurrently. - Status Wait(); + Status Wait() WARN_UNUSED_RESULT; /// Fills 'results' with up to 'max_rows' rows. May return fewer than 'max_rows' /// rows, but will not return more. @@ -134,7 +137,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// /// GetNext() is not thread-safe: multiple threads must not make concurrent GetNext() /// calls (but may call any of the other member functions concurrently with GetNext()). - Status GetNext(QueryResultSet* results, int max_rows, bool* eos); + Status GetNext(QueryResultSet* results, int max_rows, bool* eos) WARN_UNUSED_RESULT; /// Cancel execution of query. This includes the execution of the local plan fragment, /// if any, as well as all plan fragments on remote nodes. Sets query_status_ to the @@ -142,14 +145,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Idempotent. void Cancel(const Status* cause = NULL); - /// Updates status and query execution metadata of a particular - /// fragment; if 'status' is an error status or if 'done' is true, - /// considers the plan fragment to have finished execution. Assumes - /// that calls to UpdateFragmentExecStatus() won't happen - /// concurrently for the same backend. - /// If 'status' is an error status, also cancel execution of the query via a call - /// to CancelInternal(). - Status UpdateFragmentExecStatus(const TReportExecStatusParams& params); + /// Updates execution status of a particular backend as well as Insert-related + /// status (per_partition_status_ and files_to_move_). Also updates + /// num_remaining_backends_ and cancels execution if the backend has an error status. + Status UpdateBackendExecStatus(const TReportExecStatusParams& params) + WARN_UNUSED_RESULT; /// Returns the query state. /// Only valid to call after Exec() and before TearDown(). The returned @@ -172,9 +172,9 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// the future if not all fragments have finished execution. RuntimeProfile* query_profile() const { return query_profile_.get(); } - const TUniqueId& query_id() const { return query_id_; } + const TUniqueId& query_id() const; - MemTracker* query_mem_tracker() const { return query_state()->query_mem_tracker(); } + MemTracker* query_mem_tracker() const; /// This is safe to call only after Wait() const PartitionStatusMap& per_partition_status() { return per_partition_status_; } @@ -198,15 +198,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Returns query_status_. Status GetStatus(); - /// Returns the exec summary. The exec summary lock must already have been taken. - /// The caller must not block while holding the lock. - const TExecSummary& exec_summary() const { - exec_summary_lock_.DCheckLocked(); - return exec_summary_; - } - - /// See the ImpalaServer class comment for the required lock acquisition order. - SpinLock& GetExecSummaryLock() const { return exec_summary_lock_; } + /// Get a copy of the current exec summary. Thread-safe. + void GetTExecSummary(TExecSummary* exec_summary); /// Receive a local filter update from a fragment instance. Aggregate that filter update /// with others for the same filter ID into a global filter. If all updates for that @@ -214,54 +207,53 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// filter to fragment instances. void UpdateFilter(const TUpdateFilterParams& params); - /// Called once the query is complete to tear down any remaining state. + /// Called once query execution is complete to tear down any remaining state. + /// TODO: change to ReleaseResources() and don't tear down control structures. void TearDown(); private: - class InstanceState; + class BackendState; struct FilterTarget; class FilterState; - - /// Typedef for boost utility to compute averaged stats - /// TODO: including the median doesn't compile, looks like some includes are missing - typedef boost::accumulators::accumulator_set<int64_t, - boost::accumulators::features< - boost::accumulators::tag::min, - boost::accumulators::tag::max, - boost::accumulators::tag::mean, - boost::accumulators::tag::variance> - > SummaryStats; + class FragmentStats; const QuerySchedule schedule_; - ExecEnv* exec_env_; - TUniqueId query_id_; /// copied from TQueryExecRequest; constant across all fragments - TDescriptorTable desc_tbl_; TQueryCtx query_ctx_; /// copied from TQueryExecRequest, governs when to call ReportQuerySummary TStmtType::type stmt_type_; - /// map from id of a scan node to a specific counter in the node's profile - typedef std::map<PlanNodeId, RuntimeProfile::Counter*> CounterMap; + /// BackendStates for all execution backends, including the coordinator. + /// All elements are non-nullptr. Owned by obj_pool(). Populated by + /// InitBackendExec(). + std::vector<BackendState*> backend_states_; - /// Struct for per fragment instance counters that will be aggregated by the coordinator. - struct FragmentInstanceCounters { - /// Throughput counters per node - CounterMap throughput_counters; + // index into backend_states_ for coordinator fragment; -1 if no coordinator fragment + int coord_backend_idx_ = -1; - /// Total finished scan ranges per node - CounterMap scan_ranges_complete_counters; - }; + /// The QueryState for this coordinator. Set in Exec(). Released in TearDown(). + QueryState* query_state_ = nullptr; + + /// Non-null if and only if the query produces results for the client; i.e. is of + /// TStmtType::QUERY. Coordinator uses these to pull results from plan tree and return + /// them to the client in GetNext(), and also to access the fragment instance's runtime + /// state. + /// + /// Result rows are materialized by this fragment instance in its own thread. They are + /// materialized into a QueryResultSet provided to the coordinator during GetNext(). + /// + /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied + /// reference of QueryState released) in TearDown(). + FragmentInstanceState* coord_instance_ = nullptr; - /// InstanceStates for all fragment instances, including that of the coordinator - /// fragment. All elements are non-nullptr. Owned by obj_pool(). Filled in - /// StartFInstances(). - std::vector<InstanceState*> fragment_instance_states_; + /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when + /// GetNext() hits eos. + PlanRootSink* coord_sink_ = nullptr; /// True if the query needs a post-execution step to tidy up - bool needs_finalization_; + bool needs_finalization_ = false; /// Only valid if needs_finalization is true TFinalizeParams finalize_params_; @@ -269,17 +261,52 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// ensures single-threaded execution of Wait(); must not hold lock_ when acquiring this boost::mutex wait_lock_; - bool has_called_wait_; // if true, Wait() was called; protected by wait_lock_ + bool has_called_wait_ = false; // if true, Wait() was called; protected by wait_lock_ /// Keeps track of number of completed ranges and total scan ranges. ProgressUpdater progress_; + /// Total number of filter updates received (always 0 if filter mode is not + /// GLOBAL). Excludes repeated broadcast filter updates. Set in Exec(). + RuntimeProfile::Counter* filter_updates_received_ = nullptr; + + /// The filtering mode for this query. Set in constructor. + TRuntimeFilterMode::type filter_mode_; + + /// Tracks the memory consumed by runtime filters during aggregation. Child of + /// the query mem tracker in 'query_state_' and set in Exec(). + std::unique_ptr<MemTracker> filter_mem_tracker_; + + /// Object pool owned by the coordinator. + boost::scoped_ptr<ObjectPool> obj_pool_; + + /// Execution summary for a single query. + /// A wrapper around TExecSummary, with supporting structures. + struct ExecSummary { + TExecSummary thrift_exec_summary; + + /// See the ImpalaServer class comment for the required lock acquisition order. + /// The caller must not block while holding the lock. + SpinLock lock; + + /// A mapping of plan node ids to index into thrift_exec_summary.nodes + boost::unordered_map<TPlanNodeId, int> node_id_to_idx_map; + + void Init(const QuerySchedule& query_schedule); + }; + + ExecSummary exec_summary_; + + /// Aggregate counters for the entire query. + boost::scoped_ptr<RuntimeProfile> query_profile_; + /// Protects all fields below. This is held while making RPCs, so this lock should /// only be acquired if the acquiring thread is prepared to wait for a significant /// time. + /// TODO: clarify to what extent the fields below need to be protected by lock_ /// Lock ordering is /// 1. lock_ - /// 2. InstanceState::lock_ + /// 2. BackendState::lock_ boost::mutex lock_; /// Overall status of the entire query; set to the first reported fragment error @@ -289,54 +316,17 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// If true, the query is done returning all results. It is possible that the /// coordinator still needs to wait for cleanup on remote fragments (e.g. queries /// with limit) - /// Once this is set to true, errors from remote fragments are ignored. - bool returned_all_results_; - - /// The QueryState for this coordinator. Set in Exec(). Released in TearDown(). - QueryState* query_state_; - - /// Non-null if and only if the query produces results for the client; i.e. is of - /// TStmtType::QUERY. Coordinator uses these to pull results from plan tree and return - /// them to the client in GetNext(), and also to access the fragment instance's runtime - /// state. - /// - /// Result rows are materialized by this fragment instance in its own thread. They are - /// materialized into a QueryResultSet provided to the coordinator during GetNext(). - /// - /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied - /// reference of QueryState released) in TearDown(). - FragmentInstanceState* coord_instance_ = nullptr; - - /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when - /// GetNext() hits eos. - PlanRootSink* coord_sink_ = nullptr; - - /// owned by plan root, which resides in runtime_state_'s pool - const RowDescriptor* row_desc_; - - /// Returns a local object pool. - ObjectPool* obj_pool() { return obj_pool_.get(); } - - PlanFragmentExecutor* executor(); - - // Sets the TDescriptorTable(s) for the current fragment. - void SetExecPlanDescriptorTable(const TPlanFragment& fragment, - TExecPlanFragmentParams* rpc_params); - - /// True if execution has completed, false otherwise. - bool execution_completed_; - - /// Number of remote fragments that have completed - int num_remote_fragements_complete_; + /// Once this is set to true, errors from execution backends are ignored. + bool returned_all_results_ = false; /// If there is no coordinator fragment, Wait() simply waits until all - /// backends report completion by notifying on instance_completion_cv_. + /// backends report completion by notifying on backend_completion_cv_. /// Tied to lock_. - boost::condition_variable instance_completion_cv_; + boost::condition_variable backend_completion_cv_; /// Count of the number of backends for which done != true. When this /// hits 0, any Wait()'ing thread is notified - int num_remaining_fragment_instances_; + int num_remaining_backends_ = 0; /// The following two structures, partition_row_counts_ and files_to_move_ are filled in /// as the query completes, and track the results of INSERT queries that alter the @@ -353,64 +343,15 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// empty string for the destination means that a file is to be deleted. FileMoveMap files_to_move_; - /// Object pool owned by the coordinator. Any executor will have its own pool. - boost::scoped_ptr<ObjectPool> obj_pool_; - - /// Execution summary for this query. - /// See the ImpalaServer class comment for the required lock acquisition order. - mutable SpinLock exec_summary_lock_; - TExecSummary exec_summary_; - - /// A mapping of plan node ids to index into exec_summary_.nodes - boost::unordered_map<TPlanNodeId, int> plan_node_id_to_summary_map_; - - /// Aggregate counters for the entire query. - boost::scoped_ptr<RuntimeProfile> query_profile_; - - /// Event timeline for this query. Unowned. - RuntimeProfile::EventSequence* query_events_; - - /// Per fragment profile information - struct PerFragmentProfileData { - /// Averaged profile for this fragment. Stored in obj_pool. - /// The counters in this profile are averages (type AveragedCounter) of the - /// counters in the fragment instance profiles. - /// Note that the individual fragment instance profiles themselves are stored and - /// displayed as children of the root_profile below. - RuntimeProfile* averaged_profile; - - /// Number of instances running this fragment. - int num_instances; + /// Event timeline for this query. Not owned. + RuntimeProfile::EventSequence* query_events_ = nullptr; - /// Root profile for all fragment instances for this fragment - RuntimeProfile* root_profile; + /// Indexed by fragment idx (TPlanFragment.idx). Filled in InitFragmentStats(), + /// elements live in obj_pool(). + std::vector<FragmentStats*> fragment_stats_; - /// Bytes assigned for instances of this fragment - SummaryStats bytes_assigned; - - /// Completion times for instances of this fragment - SummaryStats completion_times; - - /// Execution rates for instances of this fragment - SummaryStats rates; - - PerFragmentProfileData() - : averaged_profile(nullptr), num_instances(-1), root_profile(nullptr) {} - }; - - /// This is indexed by fragment idx (TPlanFragment.idx). - /// This array is only modified at coordinator startup and query completion and - /// does not need locks. - std::vector<PerFragmentProfileData> fragment_profiles_; - - /// Throughput counters for the coordinator fragment - FragmentInstanceCounters coordinator_counters_; - - /// The set of hosts that the query will run on. Populated in Exec. - boost::unordered_set<TNetworkAddress> unique_hosts_; - - /// Total time spent in finalization (typically 0 except for INSERT into hdfs tables) - RuntimeProfile::Counter* finalization_timer_; + /// total time spent in finalization (typically 0 except for INSERT into hdfs tables) + RuntimeProfile::Counter* finalization_timer_ = nullptr; /// Barrier that is released when all calls to ExecRemoteFragment() have /// returned, successfully or not. Initialised during Exec(). @@ -425,54 +366,20 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// Set to true when all calls to UpdateFilterRoutingTable() have finished, and it's /// safe to concurrently read from filter_routing_table_. - bool filter_routing_table_complete_; - - /// Total number of filter updates received (always 0 if filter mode is not - /// GLOBAL). Excludes repeated broadcast filter updates. - RuntimeProfile::Counter* filter_updates_received_; - - /// The filtering mode for this query. Set in constructor. - TRuntimeFilterMode::type filter_mode_; - - /// Tracks the memory consumed by runtime filters during aggregation. Child of - /// the query mem tracker in 'query_state_'. - std::unique_ptr<MemTracker> filter_mem_tracker_; + bool filter_routing_table_complete_ = false; /// True if and only if TearDown() has been called. - bool torn_down_; + bool torn_down_ = false; + + /// Returns a local object pool. + ObjectPool* obj_pool() { return obj_pool_.get(); } /// Returns a pretty-printed table of the current filter state. std::string FilterDebugString(); - /// Sets 'filter_routing_table_complete_' and prints the table to the profile and log. - void MarkFilterRoutingTableComplete(); - - /// Fill in rpc_params based on params. - void SetExecPlanFragmentParams( - const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params); - - /// Wrapper for ExecPlanFragment() RPC. This function will be called in parallel from - /// multiple threads. - void ExecRemoteFInstance( - const FInstanceExecParams& exec_params, const DebugOptions* debug_options); - - /// Determine fragment number, given fragment id. - int GetFragmentNum(const TUniqueId& fragment_id); - - /// Print hdfs split size stats to VLOG_QUERY and details to VLOG_FILE - /// Attaches split size summary to the appropriate runtime profile - void PrintFragmentInstanceInfo(); - - /// Collect scan node counters from the profile. - /// Assumes lock protecting profile and result is held. - void CollectScanNodeCounters(RuntimeProfile*, FragmentInstanceCounters* result); - - /// Runs cancel logic. Assumes that lock_ is held. - void CancelInternal(); - /// Cancels all fragment instances. Assumes that lock_ is held. This may be called when /// the query is not being cancelled in the case where the query limit is reached. - void CancelFragmentInstances(); + void CancelInternal(); /// Acquires lock_ and updates query_status_ with 'status' if it's not already /// an error status, and returns the current query_status_. @@ -480,45 +387,31 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save /// failed_fragment is the fragment_id that has failed, used for error reporting along /// with instance_hostname. Status UpdateStatus(const Status& status, const TUniqueId& failed_fragment, - const std::string& instance_hostname); + const std::string& instance_hostname) WARN_UNUSED_RESULT; + + /// Update per_partition_status_ and files_to_move_. + void UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status); - /// Returns only when either all fragment instances have reported success or the query + /// Returns only when either all execution backends have reported success or the query /// is in error. Returns the status of the query. /// It is safe to call this concurrently, but any calls must be made only after Exec(). - /// WaitForAllInstances may be called before Wait(), but note that Wait() guarantees - /// that any coordinator fragment has finished, which this method does not. - Status WaitForAllInstances(); + Status WaitForBackendCompletion() WARN_UNUSED_RESULT; - /// Perform any post-query cleanup required. Called by Wait() only after all fragment - /// instances have returned, or if the query has failed, in which case it only cleans up - /// temporary data rather than finishing the INSERT in flight. - Status FinalizeQuery(); - - /// Moves all temporary staging files to their final destinations. - Status FinalizeSuccessfulInsert(); - - /// Initializes the structures in fragment_profiles_. Must be called before RPCs to - /// start remote fragments. - void InitExecProfiles(); - - /// Initialize the structures to collect execution summary of every plan node - /// (exec_summary_ and plan_node_id_to_summary_map_) - void InitExecSummary(); + /// Initializes fragment_stats_ and query_profile_. Must be called before + /// InitBackendStates(). + void InitFragmentStats(); - /// Update fragment profile information from a fragment instance state. - void UpdateAverageProfile(InstanceState* instance_state); + /// Populates backend_states_ based on schedule_.fragment_exec_params(). + /// BackendState depends on fragment_stats_, which is why InitFragmentStats() + /// must be called before this function. + void InitBackendStates(); - /// Compute the summary stats (completion_time and rates) - /// for an individual fragment_profile_ based on the specified instance state. - void ComputeFragmentSummaryStats(InstanceState* instance_state); + /// Computes execution summary info strings for fragment_stats_ and query_profile_. + /// This is assumed to be called at the end of a query -- remote fragments' + /// profiles must not be updated while this is running. + void ComputeQuerySummary(); - /// Outputs aggregate query profile summary. This is assumed to be called at the end of - /// a query -- remote fragments' profiles must not be updated while this is running. - void ReportQuerySummary(); - - /// Populates the summary execution stats from the profile. Can only be called when the - /// query is done. - void UpdateExecSummary(const InstanceState& instance_state); + /// TODO: move the next 3 functions into a separate class /// Determines what the permissions of directories created by INSERT statements should /// be if permission inheritance is enabled. Populates a map from all prefixes of @@ -534,19 +427,26 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str, PermissionCache* permissions_cache); - /// Starts all fragment instances contained in the schedule by issuing RPCs in - /// parallel and then waiting for all of the RPCs to complete. Also sets up and - /// registers the state for all fragment instances. - void StartFInstances(); + /// Moves all temporary staging files to their final destinations. + Status FinalizeSuccessfulInsert() WARN_UNUSED_RESULT; + + /// Perform any post-query cleanup required. Called by Wait() only after all fragment + /// instances have returned, or if the query has failed, in which case it only cleans up + /// temporary data rather than finishing the INSERT in flight. + Status FinalizeQuery() WARN_UNUSED_RESULT; + + /// Populates backend_states_, starts query execution at all backends in parallel, and + /// blocks until startup completes. + void StartBackendExec(); - /// Calls CancelInternal() and returns an error if there was any error starting the - /// fragments. + /// Calls CancelInternal() and returns an error if there was any error starting + /// backend execution. /// Also updates query_profile_ with the startup latency histogram. - Status FinishInstanceStartup(); + Status FinishBackendStartup() WARN_UNUSED_RESULT; /// Build the filter routing table by iterating over all plan nodes and collecting the /// filters that they either produce or consume. - void UpdateFilterRoutingTable(const FragmentExecParams& fragment_params); + void InitFilterRoutingTable(); }; }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/data-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc index f2785cd..4cb2bf0 100644 --- a/be/src/runtime/data-stream-test.cc +++ b/be/src/runtime/data-stream-test.cc @@ -81,20 +81,16 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf { ImpalaTestBackend(DataStreamMgr* stream_mgr): mgr_(stream_mgr) {} virtual ~ImpalaTestBackend() {} - virtual void ExecPlanFragment( - TExecPlanFragmentResult& return_val, const TExecPlanFragmentParams& params) {} - - virtual void ReportExecStatus( - TReportExecStatusResult& return_val, const TReportExecStatusParams& params) {} - - virtual void CancelPlanFragment( - TCancelPlanFragmentResult& return_val, const TCancelPlanFragmentParams& params) {} - - virtual void UpdateFilter( - TUpdateFilterResult& return_val, const TUpdateFilterParams& params) {} - - virtual void PublishFilter( - TPublishFilterResult& return_val, const TPublishFilterParams& params) {} + virtual void ExecQueryFInstances(TExecQueryFInstancesResult& return_val, + const TExecQueryFInstancesParams& params) {} + virtual void CancelQueryFInstances(TCancelQueryFInstancesResult& return_val, + const TCancelQueryFInstancesParams& params) {} + virtual void ReportExecStatus(TReportExecStatusResult& return_val, + const TReportExecStatusParams& params) {} + virtual void UpdateFilter(TUpdateFilterResult& return_val, + const TUpdateFilterParams& params) {} + virtual void PublishFilter(TPublishFilterResult& return_val, + const TPublishFilterParams& params) {} virtual void TransmitData( TTransmitDataResult& return_val, const TTransmitDataParams& params) { @@ -116,7 +112,7 @@ class DataStreamTest : public testing::Test { DataStreamTest() : next_val_(0) { // Initialize MemTrackers and RuntimeState for use by the data stream receiver. exec_env_.InitForFeTests(); - runtime_state_.reset(new RuntimeState(TQueryCtx(), &exec_env_, "test-pool")); + runtime_state_.reset(new RuntimeState(TQueryCtx(), &exec_env_)); // Stop tests that rely on mismatched sender / receiver pairs timing out from failing. FLAGS_datastream_sender_timeout_ms = 250; @@ -275,8 +271,7 @@ class DataStreamTest : public testing::Test { slot_desc.__set_nullIndicatorBit(-1); slot_desc.__set_slotIdx(0); thrift_desc_tbl.slotDescriptors.push_back(slot_desc); - EXPECT_OK(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, &desc_tbl_)); - runtime_state_->set_desc_tbl(desc_tbl_); + EXPECT_OK(DescriptorTbl::Create(&obj_pool_, thrift_desc_tbl, nullptr, &desc_tbl_)); vector<TTupleId> row_tids; row_tids.push_back(0); @@ -480,8 +475,7 @@ class DataStreamTest : public testing::Test { void Sender( int sender_num, int channel_buffer_size, TPartitionType::type partition_type) { - RuntimeState state(TQueryCtx(), &exec_env_, "test-pool"); - state.set_desc_tbl(desc_tbl_); + RuntimeState state(TQueryCtx(), &exec_env_, desc_tbl_); VLOG_QUERY << "create sender " << sender_num; const TDataStreamSink& sink = GetSink(partition_type); DataStreamSender sender( @@ -593,8 +587,7 @@ TEST_F(DataStreamTest, BasicTest) { // // TODO: Make lifecycle requirements more explicit. TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) { - scoped_ptr<RuntimeState> runtime_state( - new RuntimeState(TQueryCtx(), &exec_env_, "test-pool")); + scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), &exec_env_)); scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver")); // Start just one receiver. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/debug-options.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/debug-options.cc b/be/src/runtime/debug-options.cc new file mode 100644 index 0000000..7a63742 --- /dev/null +++ b/be/src/runtime/debug-options.cc @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <boost/algorithm/string.hpp> + +#include "runtime/debug-options.h" +#include "common/logging.h" + +using namespace impala; +using namespace std; +using namespace boost; + +DebugOptions::DebugOptions(const TQueryOptions& query_options) + : instance_idx_(-1), + node_id_(-1), + action_(TDebugAction::WAIT), + phase_(TExecNodePhase::INVALID) { + if (!query_options.__isset.debug_action || query_options.debug_action.empty()) { + // signal not set + return; + } + + vector<string> components; + split(components, query_options.debug_action, is_any_of(":"), token_compress_on); + if (components.size() < 3 || components.size() > 4) return; + if (components.size() == 3) { + instance_idx_ = -1; + node_id_ = atoi(components[0].c_str()); + phase_ = GetExecNodePhase(components[1]); + action_ = GetDebugAction(components[2]); + } else { + instance_idx_ = atoi(components[0].c_str()); + node_id_ = atoi(components[1].c_str()); + phase_ = GetExecNodePhase(components[2]); + action_ = GetDebugAction(components[3]); + } + DCHECK(!(phase_ == TExecNodePhase::CLOSE && action_ == TDebugAction::WAIT)) + << "Do not use CLOSE:WAIT debug actions because nodes cannot be cancelled in " + "Close()"; +} + +TExecNodePhase::type DebugOptions::GetExecNodePhase(const string& key) { + map<int, const char*>::const_iterator entry = + _TExecNodePhase_VALUES_TO_NAMES.begin(); + for (; entry != _TExecNodePhase_VALUES_TO_NAMES.end(); ++entry) { + if (iequals(key, (*entry).second)) { + return static_cast<TExecNodePhase::type>(entry->first); + } + } + return TExecNodePhase::INVALID; +} + +TDebugAction::type DebugOptions::GetDebugAction(const string& key) { + for (auto entry: _TDebugAction_VALUES_TO_NAMES) { + if (iequals(key, entry.second)) { + return static_cast<TDebugAction::type>(entry.first); + } + } + return TDebugAction::WAIT; +} + + +TDebugOptions DebugOptions::ToThrift() const { + TDebugOptions result; + result.__set_node_id(node_id_); + result.__set_phase(phase_); + result.__set_action(action_); + return result; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/debug-options.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/debug-options.h b/be/src/runtime/debug-options.h new file mode 100644 index 0000000..95f1f65 --- /dev/null +++ b/be/src/runtime/debug-options.h @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_DEBUG_OPTIONS_H +#define IMPALA_RUNTIME_DEBUG_OPTIONS_H + +#include <vector> +#include <string> + +#include "gen-cpp/Frontend_types.h" +#include "gen-cpp/Types_types.h" + +namespace impala { + +// Container for debug options in TPlanFragmentInstanceCtx (debug_node, debug_action, +// debug_phase). +// TODO: move to subdirectory /coordinator +class DebugOptions { + public: + DebugOptions(const TQueryOptions& query_options); + TDebugOptions ToThrift() const; + + /// query-wide fragment instance index; -1 if not set + int instance_idx() const { return instance_idx_; } + + /// -1 if no debug options set + int node_id() const { return node_id_; } + + TDebugAction::type action() const { return action_; } + TExecNodePhase::type phase() const { return phase_; } + + private: + int instance_idx_; + int node_id_; + TDebugAction::type action_; + TExecNodePhase::type phase_; // INVALID: debug options invalid + + static TExecNodePhase::type GetExecNodePhase(const std::string& key); + static TDebugAction::type GetDebugAction(const std::string& key); +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/descriptors.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index 709ddb1..bb7c1d0 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -480,7 +480,7 @@ string RowDescriptor::DebugString() const { } Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl, - DescriptorTbl** tbl) { + MemTracker* mem_tracker, DescriptorTbl** tbl) { *tbl = pool->Add(new DescriptorTbl()); // deserialize table descriptors first, they are being referenced by tuple descriptors for (size_t i = 0; i < thrift_tbl.tableDescriptors.size(); ++i) { @@ -489,6 +489,21 @@ Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tb switch (tdesc.tableType) { case TTableType::HDFS_TABLE: desc = pool->Add(new HdfsTableDescriptor(tdesc, pool)); + + if (mem_tracker != nullptr) { + // prepare and open partition exprs + const HdfsTableDescriptor* hdfs_tbl = + static_cast<const HdfsTableDescriptor*>(desc); + for (const auto& part_entry : hdfs_tbl->partition_descriptors()) { + // TODO: RowDescriptor should arguably be optional in Prepare for known + // literals Partition exprs are not used in the codegen case. Don't codegen + // them. + RETURN_IF_ERROR(Expr::Prepare(part_entry.second->partition_key_value_ctxs(), + nullptr, RowDescriptor(), mem_tracker)); + RETURN_IF_ERROR(Expr::Open( + part_entry.second->partition_key_value_ctxs(), nullptr)); + } + } break; case TTableType::HBASE_TABLE: desc = pool->Add(new HBaseTableDescriptor(tdesc)); @@ -530,27 +545,14 @@ Status DescriptorTbl::Create(ObjectPool* pool, const TDescriptorTable& thrift_tb return Status::OK(); } -Status DescriptorTbl::PrepareAndOpenPartitionExprs(RuntimeState* state) const { - for (const auto& tbl_entry : tbl_desc_map_) { - if (tbl_entry.second->type() != TTableType::HDFS_TABLE) continue; - HdfsTableDescriptor* hdfs_tbl = static_cast<HdfsTableDescriptor*>(tbl_entry.second); - for (const auto& part_entry : hdfs_tbl->partition_descriptors()) { - // TODO: RowDescriptor should arguably be optional in Prepare for known literals - // Partition exprs are not used in the codegen case. Don't codegen them. - RETURN_IF_ERROR(Expr::Prepare(part_entry.second->partition_key_value_ctxs(), state, - RowDescriptor(), state->instance_mem_tracker())); - RETURN_IF_ERROR(Expr::Open(part_entry.second->partition_key_value_ctxs(), state)); - } - } - return Status::OK(); -} - -void DescriptorTbl::ClosePartitionExprs(RuntimeState* state) const { - for (const auto& tbl_entry: tbl_desc_map_) { - if (tbl_entry.second->type() != TTableType::HDFS_TABLE) continue; - HdfsTableDescriptor* hdfs_tbl = static_cast<HdfsTableDescriptor*>(tbl_entry.second); +void DescriptorTbl::ReleaseResources() { + // close partition exprs of hdfs tables + for (auto entry: tbl_desc_map_) { + if (entry.second->type() != TTableType::HDFS_TABLE) continue; + const HdfsTableDescriptor* hdfs_tbl = + static_cast<const HdfsTableDescriptor*>(entry.second); for (const auto& part_entry: hdfs_tbl->partition_descriptors()) { - Expr::Close(part_entry.second->partition_key_value_ctxs(), state); + Expr::Close(part_entry.second->partition_key_value_ctxs(), nullptr); } } } @@ -585,7 +587,6 @@ SlotDescriptor* DescriptorTbl::GetSlotDescriptor(SlotId id) const { } } -// return all registered tuple descriptors void DescriptorTbl::GetTupleDescs(vector<TupleDescriptor*>* descs) const { descs->clear(); for (TupleDescriptorMap::const_iterator i = tuple_desc_map_.begin(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/descriptors.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 26f58d3..06c65c5 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -46,6 +46,7 @@ class LlvmBuilder; class LlvmCodeGen; class ObjectPool; class RuntimeState; +class MemTracker; class TDescriptorTable; class TSlotDescriptor; class TTable; @@ -283,7 +284,6 @@ class HdfsPartitionDescriptor { /// The Prepare()/Open()/Close() cycle is controlled by the containing descriptor table /// because the same partition descriptor may be used by multiple exec nodes with /// different lifetimes. - /// TODO: Move these into the new query-wide state, indexed by partition id. std::vector<ExprContext*> partition_key_value_ctxs_; /// The format (e.g. text, sequence file etc.) of data in the files in this partition @@ -461,15 +461,16 @@ class TupleDescriptor { class DescriptorTbl { public: /// Creates a descriptor tbl within 'pool' from thrift_tbl and returns it via 'tbl'. + /// If mem_tracker_ != nullptr, also opens partition exprs for hdfs tables (and does + /// memory allocation against that tracker). /// Returns OK on success, otherwise error (in which case 'tbl' will be unset). + /// TODO: when cleaning up ExprCtx, remove the need to pass in a memtracker for literal + /// exprs that don't require additional memory at runtime. static Status Create(ObjectPool* pool, const TDescriptorTable& thrift_tbl, - DescriptorTbl** tbl); + MemTracker* mem_tracker, DescriptorTbl** tbl); - /// Prepares and opens partition exprs of Hdfs tables. - Status PrepareAndOpenPartitionExprs(RuntimeState* state) const; - - /// Closes partition exprs of Hdfs tables. - void ClosePartitionExprs(RuntimeState* state) const; + /// Free memory allocated in Create(). + void ReleaseResources(); TableDescriptor* GetTableDescriptor(TableId id) const; TupleDescriptor* GetTupleDescriptor(TupleId id) const; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index e5d0c0b..6eeeaee 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -105,6 +105,7 @@ DEFINE_int32(resource_broker_cnxn_retry_interval_ms, 3000, "Deprecated"); DEFINE_int32(resource_broker_send_timeout, 0, "Deprecated"); DEFINE_int32(resource_broker_recv_timeout, 0, "Deprecated"); +// TODO-MT: rename or retire DEFINE_int32(coordinator_rpc_threads, 12, "(Advanced) Number of threads available to " "start fragments on remote Impala daemons."); @@ -150,7 +151,7 @@ ExecEnv::ExecEnv() tmp_file_mgr_(new TmpFileMgr), request_pool_service_(new RequestPoolService(metrics_.get())), frontend_(new Frontend()), - fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc", + exec_rpc_thread_pool_(new CallableThreadPool("exec-rpc-pool", "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), query_exec_mgr_(new QueryExecMgr()), @@ -215,7 +216,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)), tmp_file_mgr_(new TmpFileMgr), frontend_(new Frontend()), - fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc", + exec_rpc_thread_pool_(new CallableThreadPool("exec-rpc-pool", "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), query_exec_mgr_(new QueryExecMgr()), http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index 87824ba..43b0e0b 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -88,9 +88,7 @@ class ExecEnv { ThreadResourceMgr* thread_mgr() { return thread_mgr_.get(); } HdfsOpThreadPool* hdfs_op_thread_pool() { return hdfs_op_thread_pool_.get(); } TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); } - CallableThreadPool* fragment_exec_thread_pool() { - return fragment_exec_thread_pool_.get(); - } + CallableThreadPool* exec_rpc_thread_pool() { return exec_rpc_thread_pool_.get(); } ImpalaServer* impala_server() { return impala_server_; } Frontend* frontend() { return frontend_.get(); } RequestPoolService* request_pool_service() { return request_pool_service_.get(); } @@ -141,7 +139,7 @@ class ExecEnv { boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_; boost::scoped_ptr<RequestPoolService> request_pool_service_; boost::scoped_ptr<Frontend> frontend_; - boost::scoped_ptr<CallableThreadPool> fragment_exec_thread_pool_; + boost::scoped_ptr<CallableThreadPool> exec_rpc_thread_pool_; boost::scoped_ptr<CallableThreadPool> async_rpc_pool_; boost::scoped_ptr<QueryExecMgr> query_exec_mgr_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/fragment-instance-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc index b0adde9..9376767 100644 --- a/be/src/runtime/fragment-instance-state.cc +++ b/be/src/runtime/fragment-instance-state.cc @@ -18,150 +18,442 @@ #include "runtime/fragment-instance-state.h" +#include <sstream> +#include <thrift/protocol/TDebugProtocol.h> #include <boost/thread/locks.hpp> +#include <boost/thread/thread_time.hpp> #include <boost/thread/lock_guard.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp> +#include "common/names.h" +#include "codegen/llvm-codegen.h" +#include "exec/plan-root-sink.h" +#include "exec/exec-node.h" +#include "exec/hdfs-scan-node-base.h" // for PerVolumeStats +#include "exec/exchange-node.h" +#include "exec/scan-node.h" #include "runtime/exec-env.h" #include "runtime/backend-client.h" #include "runtime/runtime-filter-bank.h" #include "runtime/client-cache.h" #include "runtime/runtime-state.h" #include "runtime/query-state.h" +#include "runtime/query-state.h" +#include "runtime/data-stream-mgr.h" +#include "runtime/mem-tracker.h" +#include "runtime/row-batch.h" +#include "scheduling/query-schedule.h" +#include "util/debug-util.h" +#include "util/container-util.h" +#include "util/periodic-counter-updater.h" #include "gen-cpp/ImpalaInternalService_types.h" -#include "common/names.h" +DEFINE_int32(status_report_interval, 5, "interval between profile reports; in seconds"); using namespace impala; +using namespace apache::thrift; + +const string FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER = "PerHostPeakMemUsage"; + +static const string OPEN_TIMER_NAME = "OpenTime"; +static const string PREPARE_TIMER_NAME = "PrepareTime"; +static const string EXEC_TIMER_NAME = "ExecTime"; + FragmentInstanceState::FragmentInstanceState( QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, - const TPlanFragmentInstanceCtx& instance_ctx, const TDescriptorTable& desc_tbl) + const TPlanFragmentInstanceCtx& instance_ctx) : query_state_(query_state), fragment_ctx_(fragment_ctx), - instance_ctx_(instance_ctx), - desc_tbl_(desc_tbl), - executor_( - [this](const Status& status, RuntimeProfile* profile, bool done) { - ReportStatusCb(status, profile, done); - }) { + instance_ctx_(instance_ctx) { +} + +Status FragmentInstanceState::Exec() { + Status status = Prepare(); + DCHECK(runtime_state_ != nullptr); // we need to guarantee at least that + prepared_promise_.Set(status); + if (!status.ok()) { + opened_promise_.Set(status); + goto done; + } + status = Open(); + opened_promise_.Set(status); + if (!status.ok()) goto done; + + { + // Must go out of scope before Finalize(), otherwise counter will not be + // updated by time final profile is sent. + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(ADD_TIMER(timings_profile_, EXEC_TIMER_NAME)); + status = ExecInternal(); + } + +done: + // call this before Close() to make sure the thread token got released + Finalize(status); + Close(); + return status; } -Status FragmentInstanceState::UpdateStatus(const Status& status) { - lock_guard<mutex> l(status_lock_); - if (!status.ok() && exec_status_.ok()) exec_status_ = status; - return exec_status_; +void FragmentInstanceState::Cancel() { + WaitForPrepare(); // make sure Prepare() finished + + // Ensure that the sink is closed from both sides. Although in ordinary executions we + // rely on the consumer to do this, in error cases the consumer may not be able to send + // CloseConsumer() (see IMPALA-4348 for an example). + if (root_sink_ != nullptr) root_sink_->CloseConsumer(); + + DCHECK(runtime_state_ != nullptr); + runtime_state_->set_is_cancelled(); + runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id()); } -Status FragmentInstanceState::Cancel() { - lock_guard<mutex> l(status_lock_); - RETURN_IF_ERROR(exec_status_); - executor_.Cancel(); +Status FragmentInstanceState::Prepare() { + DCHECK(!prepared_promise_.IsSet()); + + VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx_); + + // Do not call RETURN_IF_ERROR or explicitly return before this line, + // runtime_state_ != nullptr is a postcondition of this function. + runtime_state_ = obj_pool()->Add(new RuntimeState( + query_state_, fragment_ctx_, instance_ctx_, ExecEnv::GetInstance())); + + // total_time_counter() is in the runtime_state_ so start it up now. + SCOPED_TIMER(profile()->total_time_counter()); + timings_profile_ = obj_pool()->Add( + new RuntimeProfile(obj_pool(), "Fragment Instance Lifecycle Timings")); + profile()->AddChild(timings_profile_); + SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME)); + + // TODO: move this into a RuntimeState::Init() + RETURN_IF_ERROR(runtime_state_->CreateBlockMgr()); + runtime_state_->InitFilterBank(); + + // Reserve one main thread from the pool + runtime_state_->resource_pool()->AcquireThreadToken(); + + avg_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens", + bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads), + runtime_state_->resource_pool())); + mem_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("MemoryUsage", + TUnit::BYTES, + bind<int64_t>(mem_fn(&MemTracker::consumption), + runtime_state_->instance_mem_tracker())); + thread_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("ThreadUsage", + TUnit::UNIT, + bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads), + runtime_state_->resource_pool())); + + // set up plan + RETURN_IF_ERROR(ExecNode::CreateTree( + runtime_state_, fragment_ctx_.fragment.plan, query_state_->desc_tbl(), + &exec_tree_)); + runtime_state_->set_fragment_root_id(exec_tree_->id()); + if (instance_ctx_.__isset.debug_options) { + ExecNode::SetDebugOptions(instance_ctx_.debug_options, exec_tree_); + } + + // set #senders of exchange nodes before calling Prepare() + vector<ExecNode*> exch_nodes; + exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes); + for (ExecNode* exch_node : exch_nodes) { + DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE); + int num_senders = + FindWithDefault(instance_ctx_.per_exch_num_senders, exch_node->id(), 0); + DCHECK_GT(num_senders, 0); + static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders); + } + + // set scan ranges + vector<ExecNode*> scan_nodes; + vector<TScanRangeParams> no_scan_ranges; + exec_tree_->CollectScanNodes(&scan_nodes); + for (ExecNode* scan_node: scan_nodes) { + const vector<TScanRangeParams>& scan_ranges = FindWithDefault( + instance_ctx_.per_node_scan_ranges, scan_node->id(), no_scan_ranges); + static_cast<ScanNode*>(scan_node)->SetScanRanges(scan_ranges); + } + + RuntimeProfile::Counter* prepare_timer = + ADD_CHILD_TIMER(timings_profile_, "ExecTreePrepareTime", PREPARE_TIMER_NAME); + { + SCOPED_TIMER(prepare_timer); + RETURN_IF_ERROR(exec_tree_->Prepare(runtime_state_)); + } + PrintVolumeIds(); + + // prepare sink_ + DCHECK(fragment_ctx_.fragment.__isset.output_sink); + RETURN_IF_ERROR( + DataSink::Create( + obj_pool(), fragment_ctx_, instance_ctx_, exec_tree_->row_desc(), &sink_)); + RETURN_IF_ERROR(sink_->Prepare(runtime_state_, runtime_state_->instance_mem_tracker())); + RuntimeProfile* sink_profile = sink_->profile(); + if (sink_profile != nullptr) profile()->AddChild(sink_profile); + + if (fragment_ctx_.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) { + root_sink_ = reinterpret_cast<PlanRootSink*>(sink_); + // Release the thread token on the root fragment instance. This fragment spends most + // of the time waiting and doing very little work. Holding on to the token causes + // underutilization of the machine. If there are 12 queries on this node, that's 12 + // tokens reserved for no reason. + ReleaseThreadToken(); + } + + if (runtime_state_->ShouldCodegen()) { + RETURN_IF_ERROR(runtime_state_->CreateCodegen()); + exec_tree_->Codegen(runtime_state_); + // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed, + // ScalarFnCall has no fall back to interpretation when codegen fails so propagates + // the error status for now. + RETURN_IF_ERROR(runtime_state_->CodegenScalarFns()); + } + + // set up profile counters + profile()->AddChild(exec_tree_->runtime_profile()); + rows_produced_counter_ = + ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT); + per_host_mem_usage_ = + ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES); + + row_batch_.reset( + new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(), + runtime_state_->instance_mem_tracker())); + VLOG(2) << "plan_root=\n" << exec_tree_->DebugString(); + + // We need to start the profile-reporting thread before calling Open(), + // since it may block. + if (FLAGS_status_report_interval > 0) { + unique_lock<mutex> l(report_thread_lock_); + report_thread_.reset( + new Thread("plan-fragment-executor", "report-profile", + &FragmentInstanceState::ReportProfileThread, this)); + // Make sure the thread started up, otherwise ReportProfileThread() might get into + // a race with StopReportThread(). + while (!report_thread_active_) report_thread_started_cv_.wait(l); + } + return Status::OK(); } -void FragmentInstanceState::Exec() { - Status status = - executor_.Prepare(query_state_, desc_tbl_, fragment_ctx_, instance_ctx_); - prepare_promise_.Set(status); - if (status.ok()) { - if (executor_.Open().ok()) { - executor_.Exec(); - } +Status FragmentInstanceState::Open() { + DCHECK(prepared_promise_.IsSet()); + DCHECK(!opened_promise_.IsSet()); + SCOPED_TIMER(profile()->total_time_counter()); + SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME)); + SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics()); + + // codegen prior to exec_tree_->Open() + if (runtime_state_->ShouldCodegen()) { + LlvmCodeGen* codegen = runtime_state_->codegen(); + DCHECK(codegen != nullptr); + RETURN_IF_ERROR(codegen->FinalizeModule()); + } + + { + SCOPED_TIMER(ADD_CHILD_TIMER(timings_profile_, "ExecTreeOpenTime", OPEN_TIMER_NAME)); + RETURN_IF_ERROR(exec_tree_->Open(runtime_state_)); } - executor_.Close(); -} - -void FragmentInstanceState::ReportStatusCb( - const Status& status, RuntimeProfile* profile, bool done) { - DCHECK(status.ok() || done); // if !status.ok() => done - Status exec_status = UpdateStatus(status); - - Status coord_status; - ImpalaBackendConnection coord( - ExecEnv::GetInstance()->impalad_client_cache(), coord_address(), &coord_status); - if (!coord_status.ok()) { - stringstream s; - s << "Couldn't get a client for " << coord_address() <<"\tReason: " - << coord_status.GetDetail(); - UpdateStatus(Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, s.str()))); - return; - } - - TReportExecStatusParams params; - params.protocol_version = ImpalaInternalServiceVersion::V1; - params.__set_query_id(query_state_->query_ctx().query_id); - params.__set_fragment_instance_id(instance_ctx_.fragment_instance_id); - exec_status.SetTStatus(¶ms); - params.__set_done(done); - - if (profile != NULL) { - profile->ToThrift(¶ms.profile); - params.__isset.profile = true; - } - - RuntimeState* runtime_state = executor_.runtime_state(); - // If executor_ did not successfully prepare, runtime state may not have been set. - if (runtime_state != NULL) { - // Only send updates to insert status if fragment is finished, the coordinator - // waits until query execution is done to use them anyhow. - if (done) { - TInsertExecStatus insert_status; - - if (runtime_state->hdfs_files_to_move()->size() > 0) { - insert_status.__set_files_to_move(*runtime_state->hdfs_files_to_move()); - } - if (runtime_state->per_partition_status()->size() > 0) { - insert_status.__set_per_partition_status(*runtime_state->per_partition_status()); - } - - params.__set_insert_exec_status(insert_status); + return sink_->Open(runtime_state_); +} + +Status FragmentInstanceState::ExecInternal() { + RuntimeProfile::Counter* plan_exec_timer = + ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME); + SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics()); + bool exec_tree_complete = false; + do { + Status status; + row_batch_->Reset(); + { + SCOPED_TIMER(plan_exec_timer); + RETURN_IF_ERROR( + exec_tree_->GetNext(runtime_state_, row_batch_.get(), &exec_tree_complete)); } + if (VLOG_ROW_IS_ON) row_batch_->VLogRows("FragmentInstanceState::ExecInternal()"); + COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows()); + RETURN_IF_ERROR(sink_->Send(runtime_state_, row_batch_.get())); + } while (!exec_tree_complete); + + // Flush the sink *before* stopping the report thread. Flush may need to add some + // important information to the last report that gets sent. (e.g. table sinks record the + // files they have written to in this method) + RETURN_IF_ERROR(sink_->FlushFinal(runtime_state())); + return Status::OK(); +} - // Send new errors to coordinator - runtime_state->GetUnreportedErrors(&(params.error_log)); - } - params.__isset.error_log = (params.error_log.size() > 0); - - TReportExecStatusResult res; - Status rpc_status; - bool retry_is_safe; - // Try to send the RPC 3 times before failing. - for (int i = 0; i < 3; ++i) { - rpc_status = coord.DoRpc( - &ImpalaBackendClient::ReportExecStatus, params, &res, &retry_is_safe); - if (rpc_status.ok()) { - rpc_status = Status(res.status); - break; +void FragmentInstanceState::Close() { + DCHECK(!report_thread_active_); + DCHECK(runtime_state_ != nullptr); + + // guard against partially-finished Prepare() + if (sink_ != nullptr) sink_->Close(runtime_state_); + + // disconnect mem_usage_sampled_counter_ from the periodic updater before + // RuntimeState::ReleaseResources(), it references the instance memtracker + if (mem_usage_sampled_counter_ != nullptr) { + PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_); + mem_usage_sampled_counter_ = nullptr; + } + + // We need to delete row_batch_ here otherwise we can't delete the instance_mem_tracker_ + // in runtime_state_->ReleaseResources(). + // TODO: do not delete mem trackers in Close()/ReleaseResources(), they are part of + // the control structures we need to preserve until the underlying QueryState + // disappears. + row_batch_.reset(); + if (exec_tree_ != nullptr) exec_tree_->Close(runtime_state_); + runtime_state_->ReleaseResources(); + + // Sanity timer checks +#ifndef NDEBUG + if (profile() != nullptr && timings_profile_ != nullptr) { + int64_t total_time = profile()->total_time_counter()->value(); + int64_t other_time = 0; + for (auto& name: {PREPARE_TIMER_NAME, OPEN_TIMER_NAME, EXEC_TIMER_NAME}) { + RuntimeProfile::Counter* counter = timings_profile_->GetCounter(name); + if (counter != nullptr) other_time += counter->value(); } - if (!retry_is_safe) break; - if (i < 2) SleepForMs(100); + // TODO: IMPALA-4631: Occasionally we see other_time = total_time + 1 for some reason + // we don't yet understand, so add 1 to total_time to avoid DCHECKing in that case. + DCHECK_LE(other_time, total_time + 1); + } +#endif +} + +void FragmentInstanceState::ReportProfileThread() { + VLOG_FILE << "ReportProfileThread(): instance_id=" << PrintId(instance_id()); + unique_lock<mutex> l(report_thread_lock_); + // tell Prepare() that we started + report_thread_active_ = true; + report_thread_started_cv_.notify_one(); + + // Jitter the reporting time of remote fragments by a random amount between + // 0 and the report_interval. This way, the coordinator doesn't get all the + // updates at once so its better for contention as well as smoother progress + // reporting. + int report_fragment_offset = rand() % FLAGS_status_report_interval; + boost::system_time timeout = boost::get_system_time() + + boost::posix_time::seconds(report_fragment_offset); + // We don't want to wait longer than it takes to run the entire fragment. + stop_report_thread_cv_.timed_wait(l, timeout); + + while (report_thread_active_) { + boost::system_time timeout = boost::get_system_time() + + boost::posix_time::seconds(FLAGS_status_report_interval); + + // timed_wait can return because the timeout occurred or the condition variable + // was signaled. We can't rely on its return value to distinguish between the + // two cases (e.g. there is a race here where the wait timed out but before grabbing + // the lock, the condition variable was signaled). Instead, we will use an external + // flag, report_thread_active_, to coordinate this. + stop_report_thread_cv_.timed_wait(l, timeout); + + if (!report_thread_active_) break; + SendReport(false, Status::OK()); + } + + VLOG_FILE << "exiting reporting thread: instance_id=" << instance_id(); +} + +void FragmentInstanceState::SendReport(bool done, const Status& status) { + DCHECK(status.ok() || done); + DCHECK(runtime_state_ != nullptr); + + if (VLOG_FILE_IS_ON) { + VLOG_FILE << "Reporting " << (done ? "final " : "") << "profile for instance " + << runtime_state_->fragment_instance_id(); + stringstream ss; + profile()->PrettyPrint(&ss); + VLOG_FILE << ss.str(); + } + + // Update the counter for the peak per host mem usage. + if (per_host_mem_usage_ != nullptr) { + per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption()); + } + + query_state_->ReportExecStatus(done, status, this); +} + +void FragmentInstanceState::StopReportThread() { + if (!report_thread_active_) return; + { + lock_guard<mutex> l(report_thread_lock_); + report_thread_active_ = false; } - if (!rpc_status.ok()) { - UpdateStatus(rpc_status); - executor_.Cancel(); + stop_report_thread_cv_.notify_one(); + report_thread_->Join(); +} + +void FragmentInstanceState::Finalize(const Status& status) { + if (fragment_ctx_.fragment.output_sink.type != TDataSinkType::PLAN_ROOT_SINK) { + // if we haven't already release this thread token in Prepare(), release it now + ReleaseThreadToken(); + } + StopReportThread(); + // It's safe to send final report now that the reporting thread is stopped. + SendReport(true, status); +} + +void FragmentInstanceState::ReleaseThreadToken() { + DCHECK(runtime_state_ != nullptr); + DCHECK(runtime_state_->resource_pool() != nullptr); + runtime_state_->resource_pool()->ReleaseThreadToken(true); + if (avg_thread_tokens_ != nullptr) { + PeriodicCounterUpdater::StopSamplingCounter(avg_thread_tokens_); + } + if (thread_usage_sampled_counter_ != nullptr) { + PeriodicCounterUpdater::StopTimeSeriesCounter(thread_usage_sampled_counter_); } } +Status FragmentInstanceState::WaitForPrepare() { + return prepared_promise_.Get(); +} + +bool FragmentInstanceState::IsPrepared() { + return prepared_promise_.IsSet(); +} + +Status FragmentInstanceState::WaitForOpen() { + return opened_promise_.Get(); +} + void FragmentInstanceState::PublishFilter( int32_t filter_id, const TBloomFilter& thrift_bloom_filter) { VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id()) << " filter_id=" << filter_id; - // Defensively protect against blocking forever in case there's some problem with - // Prepare(). - static const int WAIT_MS = 30000; - bool timed_out = false; // Wait until Prepare() is done, so we know that the filter bank is set up. - // TODO: get rid of concurrency in the setup phase as part of the per-query exec rpc - Status prepare_status = prepare_promise_.Get(WAIT_MS, &timed_out); - if (timed_out) { - LOG(ERROR) << "Unexpected timeout in PublishFilter()"; - return; - } - if (!prepare_status.ok()) return; - executor_.runtime_state()->filter_bank()->PublishGlobalFilter( - filter_id, thrift_bloom_filter); + if (!WaitForPrepare().ok()) return; + runtime_state_->filter_bank()->PublishGlobalFilter(filter_id, thrift_bloom_filter); } const TQueryCtx& FragmentInstanceState::query_ctx() const { return query_state_->query_ctx(); } + +ObjectPool* FragmentInstanceState::obj_pool() { + return query_state_->obj_pool(); +} + +RuntimeProfile* FragmentInstanceState::profile() const { + return runtime_state_->runtime_profile(); +} + +void FragmentInstanceState::PrintVolumeIds() { + if (instance_ctx_.per_node_scan_ranges.empty()) return; + + HdfsScanNodeBase::PerVolumeStats per_volume_stats; + for (const PerNodeScanRanges::value_type& entry: instance_ctx_.per_node_scan_ranges) { + HdfsScanNodeBase::UpdateHdfsSplitStats(entry.second, &per_volume_stats); + } + + stringstream str; + HdfsScanNodeBase::PrintHdfsSplitStats(per_volume_stats, &str); + profile()->AddInfoString(HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC, str.str()); + VLOG_FILE + << "Hdfs split stats (<volume id>:<# splits>/<split lengths>) for query=" + << query_id() << ":\n" << str.str(); +} + http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/fragment-instance-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h index c938a31..28b8a54 100644 --- a/be/src/runtime/fragment-instance-state.h +++ b/be/src/runtime/fragment-instance-state.h @@ -19,11 +19,17 @@ #ifndef IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H #define IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H +#include <string> +#include <boost/scoped_ptr.hpp> +#include <boost/thread/mutex.hpp> + #include "common/status.h" #include "util/promise.h" -#include "runtime/plan-fragment-executor.h" #include "gen-cpp/ImpalaInternalService_types.h" +#include "runtime/row-batch.h" +#include "util/promise.h" +#include "util/runtime-profile.h" namespace impala { @@ -32,85 +38,211 @@ class TPlanFragmentInstanceCtx; class TBloomFilter; class TUniqueId; class TNetworkAddress; +class TQueryCtx; class QueryState; -class PlanFragmentExecutor; class RuntimeProfile; - -/// Collection of state specific to the execution of a particular fragment instance, -/// as well as manager of the execution of that fragment instance, including -/// set-up and tear-down. -/// Tear-down happens automatically in the d'tor and frees all memory allocated for -/// this fragment instance and closes all data streams. +class ExecNode; +class PlanRootSink; +class Thread; +class DataSink; +class RuntimeState; + +/// FragmentInstanceState handles all aspects of the execution of a single plan fragment +/// instance, including setup and finalization, both in the success and error case. +/// Close() happens automatically at the end of Exec() and frees all memory allocated +/// for this fragment instance and closes all data streams. +/// +/// The FIS makes an aggregated profile for the entire fragment available, which +/// includes profile information for the plan itself as well as the output sink. +/// The FIS periodically makes a ReportExecStatus RPC to the coordinator to report the +/// execution status and profile. The frequency of those reports is controlled by the flag +/// status_report_interval; setting that flag to 0 disables periodic reporting altogether +/// Regardless of the value of that flag, a report is sent at least once at the end of +/// execution with an overall status and profile (and 'done' indicator). +/// The FIS will send at least one final status report. If execution ended with an error, +/// that error status will be part of the final report (it will not be overridden by +/// the resulting cancellation). /// -/// Aside from Cancel(), which may be called asynchronously, this class is not -/// thread-safe. +/// This class is thread-safe. +/// All non-getter public functions other than Exec() block until the Prepare phase +/// finishes. +/// No member variables, other than the ones passed to the c'tor, are valid before +/// the Prepare phase finishes. /// /// TODO: -/// - merge PlanFragmentExecutor into this class -/// - move tear-down logic out of d'tor and into ReleaseResources() function -/// - as part of the per-query exec rpc, get rid of concurrency during setup -/// (and remove prepare_promise_) -/// - move ReportStatusCb() logic into PFE::SendReport() and get rid of the callback +/// - absorb RuntimeState? +/// - should WaitForPrepare/Open() return the overall execution status, if there +/// was a failure? class FragmentInstanceState { public: FragmentInstanceState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, - const TPlanFragmentInstanceCtx& instance_ctx, const TDescriptorTable& desc_tbl); + const TPlanFragmentInstanceCtx& instance_ctx); - /// Frees up all resources allocated in Exec(). - /// It is an error to delete a FragmentInstanceState before Exec() returns. - ~FragmentInstanceState() { } + /// Main loop of fragment instance execution. Blocks until execution finishes and + /// automatically releases resources. Returns execution status. + /// Must only be called once. + Status Exec() WARN_UNUSED_RESULT; - /// Main loop of plan fragment execution. Blocks until execution finishes. - void Exec(); + /// Cancels execution and sends a final status report. Idempotent. + void Cancel(); - /// Returns current execution status, if there was an error. Otherwise cancels - /// the fragment and returns OK. - Status Cancel(); + /// Blocks until the Prepare phase of Exec() is finished and returns the status. + Status WaitForPrepare(); + + /// Returns true if the Prepare phase of Exec() is finished. + bool IsPrepared(); + + /// Blocks until the Prepare phase of Exec() is finished and the exec tree is + /// opened, and returns that status. If the preparation phase encountered an error, + /// GetOpenStatus() will return that error without blocking. + Status WaitForOpen(); /// Publishes filter with ID 'filter_id' to this fragment instance's filter bank. void PublishFilter(int32_t filter_id, const TBloomFilter& thrift_bloom_filter); + /// Returns fragment instance's sink if this is the root fragment instance. Valid after + /// the Prepare phase. May be nullptr. + PlanRootSink* root_sink() { return root_sink_; } + + /// Name of the counter that is tracking per query, per host peak mem usage. + /// TODO: this doesn't look like it belongs here + static const std::string PER_HOST_PEAK_MEM_COUNTER; + QueryState* query_state() { return query_state_; } - PlanFragmentExecutor* executor() { return &executor_; } + RuntimeState* runtime_state() { return runtime_state_; } + RuntimeProfile* profile() const; const TQueryCtx& query_ctx() const; const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; } const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; } const TUniqueId& query_id() const { return query_ctx().query_id; } const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; } const TNetworkAddress& coord_address() const { return query_ctx().coord_address; } + ObjectPool* obj_pool(); private: QueryState* query_state_; - const TPlanFragmentCtx fragment_ctx_; - const TPlanFragmentInstanceCtx instance_ctx_; - - /// instance-specific descriptor table - /// TODO: remove when switching to per-query exec rpc - const TDescriptorTable desc_tbl_; - - PlanFragmentExecutor executor_; - - /// protects exec_status_ - boost::mutex status_lock_; - - /// set in ReportStatusCb(); - /// if set to anything other than OK, execution has terminated w/ an error - Status exec_status_; - - /// Barrier for the completion of executor_.Prepare(). - Promise<Status> prepare_promise_; - - /// Update 'exec_status_' w/ 'status', if the former is not already an error. - /// Returns the value of 'exec_status_' after this method completes. - Status UpdateStatus(const Status& status); - - /// Callback for executor; updates exec_status_ if 'status' indicates an error - /// or if there was a thrift error. - /// If not NULL, `profile` is encoded as a Thrift structure and transmitted as part of - /// the reporting RPC. `profile` may be NULL if a runtime profile has not been created - /// for this fragment (e.g. when the fragment has failed during preparation). - /// The executor must ensure that there is only one invocation at a time. - void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool done); + const TPlanFragmentCtx& fragment_ctx_; + const TPlanFragmentInstanceCtx& instance_ctx_; + + /// All following member variables that are initialized to nullptr are set + /// in Prepare(). + + ExecNode* exec_tree_ = nullptr; // lives in obj_pool() + RuntimeState* runtime_state_ = nullptr; // lives in obj_pool() + + /// profile reporting-related + boost::scoped_ptr<Thread> report_thread_; + boost::mutex report_thread_lock_; + + /// Indicates that profile reporting thread should stop. + /// Tied to report_thread_lock_. + boost::condition_variable stop_report_thread_cv_; + + /// Indicates that profile reporting thread started. + /// Tied to report_thread_lock_. + boost::condition_variable report_thread_started_cv_; + + /// When the report thread starts, it sets report_thread_active_ to true and signals + /// report_thread_started_cv_. The report thread is shut down by setting + /// report_thread_active_ to false and signalling stop_report_thread_cv_. Protected + /// by report_thread_lock_. + bool report_thread_active_ = false; + + /// Profile for timings for each stage of the plan fragment instance's lifecycle. + /// Lives in obj_pool(). + RuntimeProfile* timings_profile_ = nullptr; + + /// Output sink for rows sent to this fragment. Created in Prepare(), lives in + /// obj_pool(). + DataSink* sink_ = nullptr; + + /// Set if this fragment instance is the root of the entire plan, so that a consumer can + /// pull results by calling root_sink_->GetNext(). Same object as sink_. + PlanRootSink* root_sink_ = nullptr; + + /// should live in obj_pool(), but managed separately so we can delete it in Close() + boost::scoped_ptr<RowBatch> row_batch_; + + /// Set when Prepare() returns. + Promise<Status> prepared_promise_; + + /// Set when OpenInternal() returns. + Promise<Status> opened_promise_; + + /// A counter for the per query, per host peak mem usage. Note that this is not the + /// max of the peak memory of all fragments running on a host since it needs to take + /// into account when they are running concurrently. All fragments for a single query + /// on a single host will have the same value for this counter. + RuntimeProfile::Counter* per_host_mem_usage_ = nullptr; + + /// Number of rows returned by this fragment + /// TODO: by this instance? + RuntimeProfile::Counter* rows_produced_counter_ = nullptr; + + /// Average number of thread tokens for the duration of the fragment instance execution. + /// Instances that do a lot of cpu work (non-coordinator fragment) will have at + /// least 1 token. Instances that contain a hdfs scan node will have 1+ tokens + /// depending on system load. Other nodes (e.g. hash join node) can also reserve + /// additional tokens. + /// This is a measure of how much CPU resources this instance used during the course + /// of the execution. + /// TODO-MT: remove + RuntimeProfile::Counter* avg_thread_tokens_ = nullptr; + + /// Sampled memory usage at even time intervals. + RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_ = nullptr; + + /// Sampled thread usage (tokens) at even time intervals. + RuntimeProfile::TimeSeriesCounter* thread_usage_sampled_counter_ = nullptr; + + /// Prepare for execution. runtime_state() will not be valid until Prepare() is called. + /// runtime_state() will always be valid after Prepare() returns. + /// If request.query_options.mem_limit > 0, it is used as an + /// approximate limit on the number of bytes this query can consume at runtime. The + /// query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit. + /// + /// A failure in Prepare() will result in partially-initialized state. + Status Prepare() WARN_UNUSED_RESULT; + + /// Executes Open() logic and returns resulting status. + Status Open() WARN_UNUSED_RESULT; + + /// Pulls row batches from exec_tree_ and pushes them to sink_ in a loop. Returns + /// OK if the input was exhausted and sent to the sink successfully, an error otherwise. + /// If ExecInternal() returns without an error condition, all rows will have been sent + /// to the sink and the sink will have been flushed. + Status ExecInternal() WARN_UNUSED_RESULT; + + /// Closes the underlying fragment instance and frees up all resources allocated in + /// Prepare() and Open(). Assumes the report thread is stopped. Can handle + /// partially-finished Prepare(). + void Close(); + + /// Main loop of profile reporting thread. + /// Exits when notified on stop_report_thread_cv_ and report_thread_active_ is set to + /// false. This will not send the final report. + void ReportProfileThread(); + + /// Invoked the report callback. If 'done' is true, sends the final report with + /// 'status' and the profile. This type of report is sent once and only by the + /// instance execution thread. Otherwise, a profile-only report is sent, which the + /// ReportProfileThread() thread will do periodically. + void SendReport(bool done, const Status& status); + + /// Called when execution is complete to finalize counters and send the final status + /// report. Must be called only once. Can handle partially-finished Prepare(). + void Finalize(const Status& status); + + /// Releases the thread token for this fragment executor. Can handle + /// partially-finished Prepare(). + void ReleaseThreadToken(); + + /// Stops report thread, if one is running. Blocks until report thread terminates. + /// Idempotent. + void StopReportThread(); + + /// Print stats about scan ranges for each volumeId in params to info log. + void PrintVolumeIds(); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/mem-tracker.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc index cfd1881..4ad9b26 100644 --- a/be/src/runtime/mem-tracker.cc +++ b/be/src/runtime/mem-tracker.cc @@ -151,7 +151,7 @@ MemTracker* PoolMemTrackerRegistry::GetRequestPoolMemTracker( lock_guard<SpinLock> l(pool_to_mem_trackers_lock_); PoolTrackersMap::iterator it = pool_to_mem_trackers_.find(pool_name); if (it != pool_to_mem_trackers_.end()) { - MemTracker* tracker = it->second; + MemTracker* tracker = it->second.get(); DCHECK(pool_name == tracker->pool_name_); return tracker; } @@ -161,7 +161,7 @@ MemTracker* PoolMemTrackerRegistry::GetRequestPoolMemTracker( new MemTracker(-1, Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name), ExecEnv::GetInstance()->process_mem_tracker()); tracker->pool_name_ = pool_name; - pool_to_mem_trackers_[pool_name] = tracker; + pool_to_mem_trackers_.emplace(pool_name, unique_ptr<MemTracker>(tracker)); return tracker; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/mem-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h index 863d3c7..e3282c7 100644 --- a/be/src/runtime/mem-tracker.h +++ b/be/src/runtime/mem-tracker.h @@ -446,7 +446,7 @@ class PoolMemTrackerRegistry { /// All per-request pool MemTracker objects. It is assumed that request pools will live /// for the entire duration of the process lifetime so MemTrackers are never removed /// from this map. Protected by 'pool_to_mem_trackers_lock_' - typedef boost::unordered_map<std::string, MemTracker*> PoolTrackersMap; + typedef boost::unordered_map<std::string, std::unique_ptr<MemTracker>> PoolTrackersMap; PoolTrackersMap pool_to_mem_trackers_; /// IMPALA-3068: Use SpinLock instead of boost::mutex so that the lock won't /// automatically destroy itself as part of process teardown, which could cause races.
