Repository: incubator-impala Updated Branches: refs/heads/master c07391ce5 -> fed75810b
IMPALA-4252: Move runtime filters to ScanNode As a preliminary step towards adding runtime filters for Kudu scans, this patch moves runtime filter related code from HdfsScanNodeBase to ScanNode so that it's available to KuduScanNodeBase. The change was mechanical with no logic changes, except for moving the calculation of the max wait time into WaitForRuntimeFilters(). Testing: - Ran existing runtime filter tests. Change-Id: I17bdc869046dc2cd837d02f333441fa6324ff086 Reviewed-on: http://gerrit.cloudera.org:8080/8148 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c38a1ac1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c38a1ac1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c38a1ac1 Branch: refs/heads/master Commit: c38a1ac1edf77a342ce52bdf544d79d5f98f197d Parents: c07391c Author: Thomas Tauber-Marshall <[email protected]> Authored: Tue Sep 26 11:43:46 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Mon Oct 2 20:01:25 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-scan-node-base.cc | 94 +---------------------------- be/src/exec/hdfs-scan-node-base.h | 20 ------ be/src/exec/kudu-scan-node-base.cc | 1 - be/src/exec/kudu-scan-node-base.h | 2 - be/src/exec/scan-node.cc | 104 ++++++++++++++++++++++++++++++++ be/src/exec/scan-node.h | 24 ++++++++ 6 files changed, 131 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c38a1ac1/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index e74efcd..5582ead 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -58,9 +58,6 @@ #include "common/names.h" -DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, " - "that a scan node will wait for expected runtime filters to arrive."); - // TODO: Remove this flag in a compatibility-breaking release. DEFINE_bool(suppress_unknown_disk_id_warnings, false, "Deprecated."); @@ -72,7 +69,6 @@ namespace filesystem = boost::filesystem; using namespace impala; using namespace llvm; using namespace strings; -using boost::algorithm::join; const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC = "Hdfs split stats (<volume id>:<# splits>/<split lengths>)"; @@ -105,7 +101,7 @@ HdfsScanNodeBase::~HdfsScanNodeBase() { } Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::Init(tnode, state)); + RETURN_IF_ERROR(ScanNode::Init(tnode, state)); // Add collection item conjuncts for (const auto& entry: tnode.hdfs_scan_node.collection_conjuncts) { @@ -119,37 +115,6 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) { DCHECK(conjuncts_map_[tuple_id_].empty()); conjuncts_map_[tuple_id_] = conjuncts_; - const TQueryOptions& query_options = state->query_options(); - for (const TRuntimeFilterDesc& filter_desc : tnode.runtime_filters) { - auto it = filter_desc.planid_to_target_ndx.find(tnode.node_id); - DCHECK(it != filter_desc.planid_to_target_ndx.end()); - const TRuntimeFilterTargetDesc& target = filter_desc.targets[it->second]; - if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::LOCAL && - !target.is_local_target) { - continue; - } - if (query_options.disable_row_runtime_filtering && - !target.is_bound_by_partition_columns) { - continue; - } - ScalarExpr* filter_expr; - RETURN_IF_ERROR( - ScalarExpr::Create(target.target_expr, *row_desc(), state, &filter_expr)); - filter_exprs_.push_back(filter_expr); - - // TODO: Move this to Prepare() - filter_ctxs_.emplace_back(); - FilterContext& filter_ctx = filter_ctxs_.back(); - filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, false); - string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id, - PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES)); - RuntimeProfile* profile = - RuntimeProfile::Create(state->obj_pool(), filter_profile_title); - runtime_profile_->AddChild(profile); - filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile, - target.is_bound_by_partition_columns)); - } - // Add min max conjuncts if (min_max_tuple_id_ != -1) { min_max_tuple_desc_ = state->desc_tbl().GetTupleDescriptor(min_max_tuple_id_); @@ -166,7 +131,6 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) { /// TODO: Break up this very long function. Status HdfsScanNodeBase::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); - runtime_state_ = state; RETURN_IF_ERROR(ScanNode::Prepare(state)); // Prepare collection conjuncts @@ -182,13 +146,6 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) { } } - DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size()); - for (int i = 0; i < filter_exprs_.size(); ++i) { - RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, pool_, - expr_mem_pool(), &filter_ctxs_[i].expr_eval)); - AddEvaluatorToFree(filter_ctxs_[i].expr_eval); - } - // Prepare min max statistics conjuncts. if (min_max_tuple_id_ != -1) { RETURN_IF_ERROR(ScalarExprEvaluator::Create(min_max_conjuncts_, state, pool_, @@ -365,7 +322,7 @@ void HdfsScanNodeBase::Codegen(RuntimeState* state) { } Status HdfsScanNodeBase::Open(RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::Open(state)); + RETURN_IF_ERROR(ScanNode::Open(state)); // Open collection conjuncts for (auto& entry: conjunct_evals_map_) { @@ -377,11 +334,6 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { // Open min max conjuncts RETURN_IF_ERROR(ScalarExprEvaluator::Open(min_max_conjunct_evals_, state)); - // Open Runtime filter expressions. - for (FilterContext& ctx : filter_ctxs_) { - RETURN_IF_ERROR(ctx.expr_eval->Open(state)); - } - // Create template tuples for all partitions. for (int64_t partition_id: partition_ids_) { HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id); @@ -490,12 +442,6 @@ void HdfsScanNodeBase::Close(RuntimeState* state) { // Close min max conjunct ScalarExprEvaluator::Close(min_max_conjunct_evals_, state); ScalarExpr::Close(min_max_conjuncts_); - - // Close filter - for (auto& filter_ctx : filter_ctxs_) { - if (filter_ctx.expr_eval != nullptr) filter_ctx.expr_eval->Close(state); - } - ScalarExpr::Close(filter_exprs_); ScanNode::Close(state); } @@ -509,11 +455,7 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) { return Status::OK(); } - int32 wait_time_ms = FLAGS_runtime_filter_wait_time_ms; - if (state->query_options().runtime_filter_wait_time_ms > 0) { - wait_time_ms = state->query_options().runtime_filter_wait_time_ms; - } - if (filter_ctxs_.size() > 0) WaitForRuntimeFilters(wait_time_ms); + if (filter_ctxs_.size() > 0) WaitForRuntimeFilters(); // Apply dynamic partition-pruning per-file. FileFormatsMap matching_per_type_files; for (const FileFormatsMap::value_type& v: per_type_files_) { @@ -560,36 +502,6 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates( return true; } -bool HdfsScanNodeBase::WaitForRuntimeFilters(int32_t time_ms) { - vector<string> arrived_filter_ids; - vector<string> missing_filter_ids; - int32_t start = MonotonicMillis(); - for (auto& ctx: filter_ctxs_) { - string filter_id = Substitute("$0", ctx.filter->id()); - if (ctx.filter->WaitForArrival(time_ms)) { - arrived_filter_ids.push_back(filter_id); - } else { - missing_filter_ids.push_back(filter_id); - } - } - int32_t end = MonotonicMillis(); - const string& wait_time = PrettyPrinter::Print(end - start, TUnit::TIME_MS); - - if (arrived_filter_ids.size() == filter_ctxs_.size()) { - runtime_profile()->AddInfoString("Runtime filters", - Substitute("All filters arrived. Waited $0", wait_time)); - VLOG_QUERY << "Filters arrived. Waited " << wait_time; - return true; - } - - const string& filter_str = Substitute( - "Not all filters arrived (arrived: [$0], missing [$1]), waited for $2", - join(arrived_filter_ids, ", "), join(missing_filter_ids, ", "), wait_time); - runtime_profile()->AddInfoString("Runtime filters", filter_str); - VLOG_QUERY << filter_str; - return false; -} - DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local, const DiskIoMgr::BufferOpts& buffer_opts, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c38a1ac1/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 7e9d322..da95ecc 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -153,7 +153,6 @@ class HdfsScanNodeBase : public ScanNode { const TupleDescriptor* tuple_desc() const { return tuple_desc_; } const HdfsTableDescriptor* hdfs_table() { return hdfs_table_; } const AvroSchemaElement& avro_schema() { return *avro_schema_.get(); } - RuntimeState* runtime_state() { return runtime_state_; } int skip_header_line_count() const { return skip_header_line_count_; } DiskIoRequestContext* reader_context() { return reader_context_; } bool optimize_parquet_count_star() const { return optimize_parquet_count_star_; } @@ -306,16 +305,10 @@ class HdfsScanNodeBase : public ScanNode { bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name, const std::vector<FilterContext>& filter_ctxs); - const std::vector<ScalarExpr*>& filter_exprs() const { return filter_exprs_; } - - const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; } - protected: friend class ScannerContext; friend class HdfsScanner; - RuntimeState* runtime_state_ = nullptr; - /// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics. const int min_max_tuple_id_; @@ -406,14 +399,6 @@ class HdfsScanNodeBase : public ScanNode { typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap; PathToSlotIdxMap path_to_materialized_slot_idx_; - /// Expressions to evaluate the input rows for filtering against runtime filters. - std::vector<ScalarExpr*> filter_exprs_; - - /// List of contexts for expected runtime filters for this scan node. These contexts are - /// cloned by individual scanners to be used in multi-threaded contexts, passed through - /// the per-scanner ScannerContext. Correspond to exprs in 'filter_exprs_'. - std::vector<FilterContext> filter_ctxs_; - /// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise> /// for 0 <= i < total # columns in table // @@ -529,11 +514,6 @@ class HdfsScanNodeBase : public ScanNode { bool FilePassesFilterPredicates(const std::vector<FilterContext>& filter_ctxs, const THdfsFileFormat::type& file_type, HdfsFileDesc* file); - /// Waits for up to time_ms for runtime filters to arrive, checking every 20ms. Returns - /// true if all filters arrived within the time limit (as measured from the time of - /// RuntimeFilterBank::RegisterFilter()), false otherwise. - bool WaitForRuntimeFilters(int32_t time_ms); - /// Stops periodic counters and aggregates counter values for the entire scan node. /// This should be called as soon as the scan node is complete to get the most accurate /// counter values. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c38a1ac1/be/src/exec/kudu-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc index 526374c..feb0af7 100644 --- a/be/src/exec/kudu-scan-node-base.cc +++ b/be/src/exec/kudu-scan-node-base.cc @@ -62,7 +62,6 @@ KuduScanNodeBase::~KuduScanNodeBase() { Status KuduScanNodeBase::Prepare(RuntimeState* state) { RETURN_IF_ERROR(ScanNode::Prepare(state)); - runtime_state_ = state; scan_ranges_complete_counter_ = ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c38a1ac1/be/src/exec/kudu-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h index 08289e1..16910c5 100644 --- a/be/src/exec/kudu-scan-node-base.h +++ b/be/src/exec/kudu-scan-node-base.h @@ -56,8 +56,6 @@ class KuduScanNodeBase : public ScanNode { /// Not thread safe, access must be synchronized. const std::string* GetNextScanToken(); - RuntimeState* runtime_state_; - private: friend class KuduScanner; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c38a1ac1/be/src/exec/scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc index d09bb6b..94f73c0 100644 --- a/be/src/exec/scan-node.cc +++ b/be/src/exec/scan-node.cc @@ -19,10 +19,18 @@ #include <boost/bind.hpp> +#include "exprs/scalar-expr.h" +#include "runtime/runtime-filter.inline.h" +#include "runtime/runtime-state.h" #include "util/runtime-profile-counters.h" #include "common/names.h" +DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, " + "that a scan node will wait for expected runtime filters to arrive."); + +using boost::algorithm::join; + namespace impala { // Changing these names have compatibility concerns. @@ -47,8 +55,46 @@ const string ScanNode::AVERAGE_HDFS_READ_THREAD_CONCURRENCY = const string ScanNode::NUM_SCANNER_THREADS_STARTED = "NumScannerThreadsStarted"; +Status ScanNode::Init(const TPlanNode& tnode, RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::Init(tnode, state)); + + const TQueryOptions& query_options = state->query_options(); + for (const TRuntimeFilterDesc& filter_desc : tnode.runtime_filters) { + auto it = filter_desc.planid_to_target_ndx.find(tnode.node_id); + DCHECK(it != filter_desc.planid_to_target_ndx.end()); + const TRuntimeFilterTargetDesc& target = filter_desc.targets[it->second]; + if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::LOCAL && + !target.is_local_target) { + continue; + } + if (query_options.disable_row_runtime_filtering && + !target.is_bound_by_partition_columns) { + continue; + } + ScalarExpr* filter_expr; + RETURN_IF_ERROR( + ScalarExpr::Create(target.target_expr, *row_desc(), state, &filter_expr)); + filter_exprs_.push_back(filter_expr); + + // TODO: Move this to Prepare() + filter_ctxs_.emplace_back(); + FilterContext& filter_ctx = filter_ctxs_.back(); + filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, false); + string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id, + PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES)); + RuntimeProfile* profile = + RuntimeProfile::Create(state->obj_pool(), filter_profile_title); + runtime_profile_->AddChild(profile); + filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile, + target.is_bound_by_partition_columns)); + } + + return Status::OK(); +} + Status ScanNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); + runtime_state_ = state; RETURN_IF_ERROR(ExecNode::Prepare(state)); scanner_thread_counters_ = @@ -65,15 +111,73 @@ Status ScanNode::Prepare(RuntimeState* state) { TOTAL_THROUGHPUT_COUNTER, bytes_read_counter_); materialize_tuple_timer_ = ADD_CHILD_TIMER(runtime_profile(), MATERIALIZE_TUPLE_TIMER, SCANNER_THREAD_TOTAL_WALLCLOCK_TIME); + + DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size()); + for (int i = 0; i < filter_exprs_.size(); ++i) { + RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, pool_, + expr_mem_pool(), &filter_ctxs_[i].expr_eval)); + AddEvaluatorToFree(filter_ctxs_[i].expr_eval); + } + + return Status::OK(); +} + +Status ScanNode::Open(RuntimeState* state) { + RETURN_IF_ERROR(ExecNode::Open(state)); + + // Open Runtime filter expressions. + for (FilterContext& ctx : filter_ctxs_) { + RETURN_IF_ERROR(ctx.expr_eval->Open(state)); + } + return Status::OK(); } void ScanNode::Close(RuntimeState* state) { if (is_closed()) return; + // Close filter + for (auto& filter_ctx : filter_ctxs_) { + if (filter_ctx.expr_eval != nullptr) filter_ctx.expr_eval->Close(state); + } + ScalarExpr::Close(filter_exprs_); // ScanNode::Prepare() started periodic counters including 'total_throughput_counter_' // and 'bytes_read_timeseries_counter_'. Subclasses may also have started counters. runtime_profile_->StopPeriodicCounters(); ExecNode::Close(state); } +bool ScanNode::WaitForRuntimeFilters() { + int32 wait_time_ms = FLAGS_runtime_filter_wait_time_ms; + if (runtime_state_->query_options().runtime_filter_wait_time_ms > 0) { + wait_time_ms = runtime_state_->query_options().runtime_filter_wait_time_ms; + } + vector<string> arrived_filter_ids; + vector<string> missing_filter_ids; + int32_t start = MonotonicMillis(); + for (auto& ctx: filter_ctxs_) { + string filter_id = Substitute("$0", ctx.filter->id()); + if (ctx.filter->WaitForArrival(wait_time_ms)) { + arrived_filter_ids.push_back(filter_id); + } else { + missing_filter_ids.push_back(filter_id); + } + } + int32_t end = MonotonicMillis(); + const string& wait_time = PrettyPrinter::Print(end - start, TUnit::TIME_MS); + + if (arrived_filter_ids.size() == filter_ctxs_.size()) { + runtime_profile()->AddInfoString("Runtime filters", + Substitute("All filters arrived. Waited $0", wait_time)); + VLOG_QUERY << "Filters arrived. Waited " << wait_time; + return true; + } + + const string& filter_str = Substitute( + "Not all filters arrived (arrived: [$0], missing [$1]), waited for $2", + join(arrived_filter_ids, ", "), join(missing_filter_ids, ", "), wait_time); + runtime_profile()->AddInfoString("Runtime filters", filter_str); + VLOG_QUERY << filter_str; + return false; +} + } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c38a1ac1/be/src/exec/scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h index 4b11361..ff79857 100644 --- a/be/src/exec/scan-node.h +++ b/be/src/exec/scan-node.h @@ -21,6 +21,7 @@ #include <string> #include "exec/exec-node.h" +#include "exec/filter-context.h" #include "util/runtime-profile.h" #include "gen-cpp/ImpalaInternalService_types.h" @@ -84,7 +85,9 @@ class ScanNode : public ExecNode { scan_range_params_(NULL), active_scanner_thread_counter_(TUnit::UNIT, 0) {} + virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT; virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT; + virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT; /// Stops all periodic counters and calls ExecNode::Close(). Subclasses of ScanNode can /// start periodic counters and rely on this function stopping them. @@ -98,6 +101,7 @@ class ScanNode : public ExecNode { virtual bool IsScanNode() const { return true; } + RuntimeState* runtime_state() { return runtime_state_; } RuntimeProfile::Counter* bytes_read_counter() const { return bytes_read_counter_; } RuntimeProfile::Counter* rows_read_counter() const { return rows_read_counter_; } RuntimeProfile::Counter* collection_items_read_counter() const { @@ -143,7 +147,13 @@ class ScanNode : public ExecNode { static const std::string AVERAGE_HDFS_READ_THREAD_CONCURRENCY; static const std::string NUM_SCANNER_THREADS_STARTED; + const std::vector<ScalarExpr*>& filter_exprs() const { return filter_exprs_; } + + const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; } + protected: + RuntimeState* runtime_state_ = nullptr; + /// The scan ranges this scan node is responsible for. Not owned. const std::vector<TScanRangeParams>* scan_range_params_; @@ -176,6 +186,20 @@ class ScanNode : public ExecNode { RuntimeProfile::Counter* average_scanner_thread_concurrency_; RuntimeProfile::Counter* num_scanner_threads_started_counter_; + + /// Expressions to evaluate the input rows for filtering against runtime filters. + std::vector<ScalarExpr*> filter_exprs_; + + /// List of contexts for expected runtime filters for this scan node. These contexts are + /// cloned by individual scanners to be used in multi-threaded contexts, passed through + /// the per-scanner ScannerContext. Correspond to exprs in 'filter_exprs_'. + std::vector<FilterContext> filter_ctxs_; + + /// Waits for runtime filters to arrive, checking every 20ms. Max wait time is specified + /// by the 'runtime_filter_wait_time_ms' flag, which is overridden by the query option + /// of the same name. Returns true if all filters arrived within the time limit (as + /// measured from the time of RuntimeFilterBank::RegisterFilter()), false otherwise. + bool WaitForRuntimeFilters(); }; }
