IMPALA-3168: replace HashTable parameters with constants This addresses the regression for small-ndv aggs resulting from prefetching. The idea is that for small-ndv aggs prefetching increases the # of instructions and memory references, but doesn't provide any compensating benefit. This change replaces constant values in the hash-table code, which reduces the instruction count and # of memory references in aggregations.
Preliminary perf results show that a low-NDV decimal agg is around 20% faster (2.1s -> 1.7s) and a high-NDV decimal agg is around 7% faster (15s -> 14s). I haven't investigated how much of the speedup is reduced codegen time. Change-Id: I483a19662c90ca54bc21d60fd6ba97dbed93eaef Reviewed-on: http://gerrit.cloudera.org:8080/3088 Tested-by: Internal Jenkins Reviewed-by: Dan Hecht <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/265e39f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/265e39f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/265e39f8 Branch: refs/heads/master Commit: 265e39f89a914541064555b764bfd8f7747bb294 Parents: 9f4276e Author: Tim Armstrong <[email protected]> Authored: Fri May 13 15:48:59 2016 -0700 Committer: Tim Armstrong <[email protected]> Committed: Tue May 17 10:09:06 2016 -0700 ---------------------------------------------------------------------- be/src/codegen/llvm-codegen.cc | 52 ++++++++++++++++---- be/src/codegen/llvm-codegen.h | 16 +++++++ be/src/exec/hash-table-test.cc | 3 +- be/src/exec/hash-table.cc | 32 ++++++++++--- be/src/exec/hash-table.h | 45 ++++++++++++++---- be/src/exec/hash-table.inline.h | 60 +++++++++++++----------- be/src/exec/partitioned-aggregation-node.cc | 26 ++++++++-- be/src/exec/partitioned-hash-join-node.cc | 37 +++++++++++++++ 8 files changed, 215 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/265e39f8/be/src/codegen/llvm-codegen.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen.cc b/be/src/codegen/llvm-codegen.cc index 0651bf7..b7f0601 100644 --- a/be/src/codegen/llvm-codegen.cc +++ b/be/src/codegen/llvm-codegen.cc @@ -622,27 +622,61 @@ Function* LlvmCodeGen::FnPrototype::GeneratePrototype( } int LlvmCodeGen::ReplaceCallSites(Function* caller, Function* new_fn, - const string& replacee_name) { + const string& target_name) { DCHECK(!is_compiled_); DCHECK(caller->getParent() == module_); DCHECK(caller != NULL); DCHECK(new_fn != NULL); + + vector<CallInst*> call_sites; + FindCallSites(caller, target_name, &call_sites); int replaced = 0; + for (CallInst* call_instr: call_sites) { + // Replace the called function + call_instr->setCalledFunction(new_fn); + ++replaced; + } + return replaced; +} + +int LlvmCodeGen::ReplaceCallSitesWithValue(Function* caller, Value* replacement, + const string& target_name) { + DCHECK(!is_compiled_); + DCHECK(caller->getParent() == module_); + DCHECK(caller != NULL); + DCHECK(replacement != NULL); + + vector<CallInst*> call_sites; + FindCallSites(caller, target_name, &call_sites); + int replaced = 0; + for (CallInst* call_instr: call_sites) { + call_instr->replaceAllUsesWith(replacement); + ++replaced; + } + return replaced; +} + +int LlvmCodeGen::ReplaceCallSitesWithBoolConst(llvm::Function* caller, bool constant, + const string& target_name) { + Value* replacement = ConstantInt::get(Type::getInt1Ty(context()), constant); + return ReplaceCallSitesWithValue(caller, replacement, target_name); +} + +void LlvmCodeGen::FindCallSites(Function* caller, const string& target_name, + vector<CallInst*>* results) { for (inst_iterator iter = inst_begin(caller); iter != inst_end(caller); ++iter) { Instruction* instr = &*iter; - // look for call instructions + // Look for call instructions. Note that we'll ignore invoke and other related + // instructions that are not a plain function call. if (CallInst::classof(instr)) { CallInst* call_instr = reinterpret_cast<CallInst*>(instr); - Function* old_fn = call_instr->getCalledFunction(); - // look for call instruction that matches the name - if (old_fn != NULL && old_fn->getName().find(replacee_name) != string::npos) { - // Replace the called function - call_instr->setCalledFunction(new_fn); - ++replaced; + Function* callee = call_instr->getCalledFunction(); + // Check for substring match. + if (callee != NULL && callee->getName().find(target_name) != string::npos) { + results->push_back(call_instr); } } } - return replaced; } Function* LlvmCodeGen::CloneFunction(Function* fn) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/265e39f8/be/src/codegen/llvm-codegen.h ---------------------------------------------------------------------- diff --git a/be/src/codegen/llvm-codegen.h b/be/src/codegen/llvm-codegen.h index d4255a0..332e599 100644 --- a/be/src/codegen/llvm-codegen.h +++ b/be/src/codegen/llvm-codegen.h @@ -261,6 +261,11 @@ class LlvmCodeGen { int ReplaceCallSites(llvm::Function* caller, llvm::Function* new_fn, const std::string& target_name); + /// Same as ReplaceCallSites(), except replaces the function call instructions with the + /// boolean value 'constant'. + int ReplaceCallSitesWithBoolConst(llvm::Function* caller, bool constant, + const std::string& target_name); + /// Returns a copy of fn. The copy is added to the module. llvm::Function* CloneFunction(llvm::Function* fn); @@ -464,6 +469,17 @@ class LlvmCodeGen { /// Clears generated hash fns. This is only used for testing. void ClearHashFns(); + /// Replace calls to functions in 'caller' where the callee's name has 'target_name' + /// as a substring. Calls to functions are replaced with the value 'replacement'. The + /// return value is the number of calls replaced. + int ReplaceCallSitesWithValue(llvm::Function* caller, llvm::Value* replacement, + const std::string& target_name); + + /// Finds call instructions in 'caller' where 'target_name' is a substring of the + /// callee's name. Found instructions are appended to the 'results' vector. + static void FindCallSites(llvm::Function* caller, const std::string& target_name, + std::vector<llvm::CallInst*>* results); + /// Whether InitializeLlvm() has been called. static bool llvm_initialized_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/265e39f8/be/src/exec/hash-table-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc index 7559473..d6cd196 100644 --- a/be/src/exec/hash-table-test.cc +++ b/be/src/exec/hash-table-test.cc @@ -189,7 +189,7 @@ class HashTableTest : public testing::Test { // Initial_num_buckets must be a power of two. EXPECT_EQ(initial_num_buckets, BitUtil::RoundUpToPowerOfTwo(initial_num_buckets)); int64_t max_num_buckets = 1L << 31; - table->reset(new HashTable(quadratic, runtime_state_, client, 1, NULL, + table->reset(new HashTable(quadratic, runtime_state_, client, true, 1, NULL, max_num_buckets, initial_num_buckets)); return (*table)->Init(); } @@ -239,6 +239,7 @@ class HashTableTest : public testing::Test { probe_expr_ctxs_, true /* stores_nulls_ */, vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, &ht_ctx); EXPECT_OK(status); + for (int i = 0; i < 2; ++i) { if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue; BufferedTupleStream::RowIdx dummy_row_idx; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/265e39f8/be/src/exec/hash-table.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc index 719b600..7ff2886 100644 --- a/be/src/exec/hash-table.cc +++ b/be/src/exec/hash-table.cc @@ -84,8 +84,8 @@ static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof HashTableCtx::HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs, const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls, - const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels, - MemTracker* tracker) + const std::vector<bool>& finds_nulls, int32_t initial_seed, + int max_levels, MemTracker* tracker) : build_expr_ctxs_(build_expr_ctxs), probe_expr_ctxs_(probe_expr_ctxs), stores_nulls_(stores_nulls), @@ -341,20 +341,21 @@ void HashTableCtx::ExprValuesCache::ResetForRead() { const double HashTable::MAX_FILL_FACTOR = 0.75f; HashTable* HashTable::Create(RuntimeState* state, - BufferedBlockMgr::Client* client, int num_build_tuples, + BufferedBlockMgr::Client* client, bool stores_duplicates, int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t max_num_buckets, int64_t initial_num_buckets) { - return new HashTable(FLAGS_enable_quadratic_probing, state, client, + return new HashTable(FLAGS_enable_quadratic_probing, state, client, stores_duplicates, num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets); } HashTable::HashTable(bool quadratic_probing, RuntimeState* state, - BufferedBlockMgr::Client* client, int num_build_tuples, BufferedTupleStream* stream, - int64_t max_num_buckets, int64_t num_buckets) + BufferedBlockMgr::Client* client, bool stores_duplicates, int num_build_tuples, + BufferedTupleStream* stream, int64_t max_num_buckets, int64_t num_buckets) : state_(state), block_mgr_client_(client), tuple_stream_(stream), stores_tuples_(num_build_tuples == 1), + stores_duplicates_(stores_duplicates), quadratic_probing_(quadratic_probing), total_data_page_size_(0), next_node_(NULL), @@ -1154,3 +1155,22 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, bool force_null_equality } return Status::OK(); } + +Status HashTableCtx::ReplaceHashTableConstants(RuntimeState* state, + bool stores_duplicates, int num_build_tuples, Function* fn, + HashTableReplacedConstants* replacement_counts) { + LlvmCodeGen* codegen; + RETURN_IF_ERROR(state->GetCodegen(&codegen)); + + replacement_counts->stores_nulls = codegen->ReplaceCallSitesWithBoolConst( + fn, stores_nulls(), "stores_nulls"); + replacement_counts->finds_some_nulls = codegen->ReplaceCallSitesWithBoolConst( + fn, finds_some_nulls(), "finds_some_nulls"); + replacement_counts->stores_tuples = codegen->ReplaceCallSitesWithBoolConst( + fn, num_build_tuples == 1, "stores_tuples"); + replacement_counts->stores_duplicates = codegen->ReplaceCallSitesWithBoolConst( + fn, stores_duplicates, "stores_duplicates"); + replacement_counts->quadratic_probing = codegen->ReplaceCallSitesWithBoolConst( + fn, FLAGS_enable_quadratic_probing, "quadratic_probing"); + return Status::OK(); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/265e39f8/be/src/exec/hash-table.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h index 6fc4169..614917a 100644 --- a/be/src/exec/hash-table.h +++ b/be/src/exec/hash-table.h @@ -123,8 +123,8 @@ class HashTableCtx { /// space by not storing some rows we know will never match. HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs, const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls, - const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels, - MemTracker* tracker); + const std::vector<bool>& finds_nulls, int32_t initial_seed, + int max_levels, MemTracker* tracker); /// Create a hash table context with the specified parameters, invoke Init() to /// initialize the new hash table context and return it in 'ht_ctx'. Please see header @@ -190,6 +190,22 @@ class HashTableCtx { /// supports it (see hash-util.h). Status CodegenHashCurrentRow(RuntimeState* state, bool use_murmur, llvm::Function** fn); + /// Struct that returns the number of constants replaced by ReplaceConstants(). + struct HashTableReplacedConstants { + int stores_nulls; + int finds_some_nulls; + int stores_tuples; + int stores_duplicates; + int quadratic_probing; + }; + + /// Replace hash table parameters with constants in 'fn'. Updates 'replacement_counts' + /// with the number of replacements made. 'num_build_tuples' and 'stores_duplicates' + /// correspond to HashTable parameters with the same name. + Status ReplaceHashTableConstants(RuntimeState* state, bool stores_duplicates, + int num_build_tuples, llvm::Function* fn, + HashTableReplacedConstants* replacement_counts); + static const char* LLVM_CLASS_NAME; /// To enable prefetching, the hash table building and probing are pipelined by the @@ -417,13 +433,15 @@ class HashTableCtx { /// Cross-compiled function to access member variables used in CodegenHashCurrentRow(). uint32_t GetHashSeed() const; + /// Functions to be replaced by codegen to specialize the hash table. + bool IR_NO_INLINE stores_nulls() const { return stores_nulls_; } + bool IR_NO_INLINE finds_some_nulls() const { return finds_some_nulls_; } + const std::vector<ExprContext*>& build_expr_ctxs_; const std::vector<ExprContext*>& probe_expr_ctxs_; /// Constants on how the hash table should behave. Joins and aggs have slightly /// different behavior. - /// TODO: these constants are an ideal candidate to be removed with codegen. - /// TODO: ..or with template-ization const bool stores_nulls_; const std::vector<bool> finds_nulls_; @@ -512,6 +530,8 @@ class HashTable { /// Returns a newly allocated HashTable. The probing algorithm is set by the /// FLAG_enable_quadratic_probing. /// - client: block mgr client to allocate data pages from. + /// - stores_duplicates: true if rows with duplicate keys may be inserted into the + /// hash table. /// - num_build_tuples: number of Tuples in the build tuple row. /// - tuple_stream: the tuple stream which contains the tuple rows index by the /// hash table. Can be NULL if the rows contain only a single tuple, in which @@ -522,8 +542,8 @@ class HashTable { /// - initial_num_buckets: number of buckets that the hash table should be initialized /// with. static HashTable* Create(RuntimeState* state, BufferedBlockMgr::Client* client, - int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t max_num_buckets, - int64_t initial_num_buckets); + bool stores_duplicates, int num_build_tuples, BufferedTupleStream* tuple_stream, + int64_t max_num_buckets, int64_t initial_num_buckets); /// Allocates the initial bucket structure. Returns false if OOM. bool Init(); @@ -730,7 +750,7 @@ class HashTable { /// - quadratic_probing: set to true when the probing algorithm is quadratic, as /// opposed to linear. HashTable(bool quadratic_probing, RuntimeState* state, BufferedBlockMgr::Client* client, - int num_build_tuples, BufferedTupleStream* tuple_stream, + bool stores_duplicates, int num_build_tuples, BufferedTupleStream* tuple_stream, int64_t max_num_buckets, int64_t initial_num_buckets); /// Performs the probing operation according to the probing algorithm (linear or @@ -802,6 +822,11 @@ class HashTable { /// Grow the node array. Returns false on OOM. bool GrowNodeArray(); + /// Functions to be replaced by codegen to specialize the hash table. + bool IR_NO_INLINE stores_tuples() const { return stores_tuples_; } + bool IR_NO_INLINE stores_duplicates() const { return stores_duplicates_; } + bool IR_NO_INLINE quadratic_probing() const { return quadratic_probing_; } + /// Load factor that will trigger growing the hash table on insert. This is /// defined as the number of non-empty buckets / total_buckets static const double MAX_FILL_FACTOR; @@ -818,10 +843,11 @@ class HashTable { /// Constants on how the hash table should behave. Joins and aggs have slightly /// different behavior. - /// TODO: these constants are an ideal candidate to be removed with codegen. - /// TODO: ..or with template-ization const bool stores_tuples_; + /// True if duplicates may be inserted into hash table. + const bool stores_duplicates_; + /// Quadratic probing enabled (as opposed to linear). const bool quadratic_probing_; @@ -857,7 +883,6 @@ class HashTable { int64_t num_buckets_with_duplicates_; /// Number of build tuples, used for constructing temp row* for probes. - /// TODO: We should remove it. const int num_build_tuples_; /// Flag used to disable spilling hash tables that already had matches in case of http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/265e39f8/be/src/exec/hash-table.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h index 6d33869..7499da3 100644 --- a/be/src/exec/hash-table.inline.h +++ b/be/src/exec/hash-table.inline.h @@ -25,14 +25,14 @@ namespace impala { inline bool HashTableCtx::EvalAndHashBuild(TupleRow* row) { bool has_null = EvalBuildRow(row); - if (!stores_nulls_ && has_null) return false; + if (!stores_nulls() && has_null) return false; expr_values_cache_.SetExprValuesHash(HashCurrentRow()); return true; } inline bool HashTableCtx::EvalAndHashProbe(TupleRow* row) { bool has_null = EvalProbeRow(row); - if (has_null && !(stores_nulls_ && finds_some_nulls_)) return false; + if (has_null && !(stores_nulls() && finds_some_nulls())) return false; expr_values_cache_.SetExprValuesHash(HashCurrentRow()); return true; } @@ -72,7 +72,7 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets, // Move to the next bucket. ++step; ++travel_length_; - if (quadratic_probing_) { + if (quadratic_probing()) { // The i-th probe location is idx = (hash + (step * (step + 1)) / 2) mod num_buckets. // This gives num_buckets unique idxs (between 0 and N-1) when num_buckets is a power // of 2. @@ -82,7 +82,7 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets, } } while (LIKELY(step < num_buckets)); DCHECK_EQ(num_filled_buckets_, num_buckets) << "Probing of a non-full table " - << "failed: " << quadratic_probing_ << " " << hash; + << "failed: " << quadratic_probing() << " " << hash; return Iterator::BUCKET_NOT_FOUND; } @@ -108,7 +108,7 @@ inline bool HashTable::Insert(HashTableCtx* ht_ctx, HtData* htdata = InsertInternal(ht_ctx); // If successful insert, update the contents of the newly inserted entry with 'idx'. if (LIKELY(htdata != NULL)) { - if (stores_tuples_) { + if (stores_tuples()) { htdata->tuple = row->GetTuple(0); } else { htdata->idx = idx; @@ -136,7 +136,7 @@ inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* ht_ctx) { int64_t bucket_idx = Probe<false>(buckets_, num_buckets_, ht_ctx, hash, &found); if (found) { return Iterator(this, ht_ctx->scratch_row(), bucket_idx, - buckets_[bucket_idx].bucketData.duplicates); + stores_duplicates() ? buckets_[bucket_idx].bucketData.duplicates : NULL); } return End(); } @@ -147,8 +147,10 @@ inline HashTable::Iterator HashTable::FindBuildRowBucket( ++num_probes_; uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash(); int64_t bucket_idx = Probe<true>(buckets_, num_buckets_, ht_ctx, hash, found); - DuplicateNode* duplicates = LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND) ? - buckets_[bucket_idx].bucketData.duplicates : NULL; + DuplicateNode* duplicates = NULL; + if (stores_duplicates() && LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND)) { + duplicates = buckets_[bucket_idx].bucketData.duplicates; + } return Iterator(this, ht_ctx->scratch_row(), bucket_idx, duplicates); } @@ -167,8 +169,8 @@ inline HashTable::Iterator HashTable::FirstUnmatched(HashTableCtx* ctx) { // Check whether the bucket, or its first duplicate node, is matched. If it is not // matched, then return. Otherwise, move to the first unmatched entry (node or bucket). Bucket* bucket = &buckets_[bucket_idx]; - if ((!bucket->hasDuplicates && bucket->matched) || - (bucket->hasDuplicates && node->matched)) { + bool has_duplicates = stores_duplicates() && bucket->hasDuplicates; + if ((!has_duplicates && bucket->matched) || (has_duplicates && node->matched)) { it.NextUnmatched(); } return it; @@ -178,7 +180,7 @@ inline void HashTable::NextFilledBucket(int64_t* bucket_idx, DuplicateNode** nod ++*bucket_idx; for (; *bucket_idx < num_buckets_; ++*bucket_idx) { if (buckets_[*bucket_idx].filled) { - *node = buckets_[*bucket_idx].bucketData.duplicates; + *node = stores_duplicates() ? buckets_[*bucket_idx].bucketData.duplicates : NULL; return; } } @@ -212,6 +214,7 @@ inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode(int64_t bucket_i DCHECK_LT(bucket_idx, num_buckets_); Bucket* bucket = &buckets_[bucket_idx]; DCHECK(bucket->filled); + DCHECK(stores_duplicates()); // Allocate one duplicate node for the new data and one for the preexisting data, // if needed. while (node_remaining_current_page_ < 1 + !bucket->hasDuplicates) { @@ -234,18 +237,19 @@ inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode(int64_t bucket_i return AppendNextNode(bucket); } -inline TupleRow* HashTable::GetRow(HtData& htdata, TupleRow* row) const { - if (stores_tuples_) { +inline TupleRow* IR_ALWAYS_INLINE HashTable::GetRow(HtData& htdata, TupleRow* row) const { + if (stores_tuples()) { return reinterpret_cast<TupleRow*>(&htdata.tuple); } else { + // TODO: GetTupleRow() has interpreted code that iterates over the row's descriptor. tuple_stream_->GetTupleRow(htdata.idx, row); return row; } } -inline TupleRow* HashTable::GetRow(Bucket* bucket, TupleRow* row) const { +inline TupleRow* IR_ALWAYS_INLINE HashTable::GetRow(Bucket* bucket, TupleRow* row) const { DCHECK(bucket != NULL); - if (UNLIKELY(bucket->hasDuplicates)) { + if (UNLIKELY(stores_duplicates() && bucket->hasDuplicates)) { DuplicateNode* duplicate = bucket->bucketData.duplicates; DCHECK(duplicate != NULL); return GetRow(duplicate->htdata, row); @@ -254,12 +258,12 @@ inline TupleRow* HashTable::GetRow(Bucket* bucket, TupleRow* row) const { } } -inline TupleRow* HashTable::Iterator::GetRow() const { +inline TupleRow* IR_ALWAYS_INLINE HashTable::Iterator::GetRow() const { DCHECK(!AtEnd()); DCHECK(table_ != NULL); DCHECK(scratch_row_ != NULL); Bucket* bucket = &table_->buckets_[bucket_idx_]; - if (UNLIKELY(bucket->hasDuplicates)) { + if (UNLIKELY(table_->stores_duplicates() && bucket->hasDuplicates)) { DCHECK(node_ != NULL); return table_->GetRow(node_->htdata, scratch_row_); } else { @@ -267,12 +271,12 @@ inline TupleRow* HashTable::Iterator::GetRow() const { } } -inline Tuple* HashTable::Iterator::GetTuple() const { +inline Tuple* IR_ALWAYS_INLINE HashTable::Iterator::GetTuple() const { DCHECK(!AtEnd()); - DCHECK(table_->stores_tuples_); + DCHECK(table_->stores_tuples()); Bucket* bucket = &table_->buckets_[bucket_idx_]; // TODO: To avoid the hasDuplicates check, store the HtData* in the Iterator. - if (UNLIKELY(bucket->hasDuplicates)) { + if (UNLIKELY(table_->stores_duplicates() && bucket->hasDuplicates)) { DCHECK(node_ != NULL); return node_->htdata.tuple; } else { @@ -282,7 +286,7 @@ inline Tuple* HashTable::Iterator::GetTuple() const { inline void HashTable::Iterator::SetTuple(Tuple* tuple, uint32_t hash) { DCHECK(!AtEnd()); - DCHECK(table_->stores_tuples_); + DCHECK(table_->stores_tuples()); table_->PrepareBucketForInsert(bucket_idx_, hash); table_->buckets_[bucket_idx_].bucketData.htdata.tuple = tuple; } @@ -290,7 +294,7 @@ inline void HashTable::Iterator::SetTuple(Tuple* tuple, uint32_t hash) { inline void HashTable::Iterator::SetMatched() { DCHECK(!AtEnd()); Bucket* bucket = &table_->buckets_[bucket_idx_]; - if (bucket->hasDuplicates) { + if (table_->stores_duplicates() && bucket->hasDuplicates) { node_->matched = true; } else { bucket->matched = true; @@ -303,7 +307,7 @@ inline void HashTable::Iterator::SetMatched() { inline bool HashTable::Iterator::IsMatched() const { DCHECK(!AtEnd()); Bucket* bucket = &table_->buckets_[bucket_idx_]; - if (bucket->hasDuplicates) { + if (table_->stores_duplicates() && bucket->hasDuplicates) { return node_->matched; } return bucket->matched; @@ -326,7 +330,8 @@ inline void HashTable::Iterator::PrefetchBucket() { inline void HashTable::Iterator::Next() { DCHECK(!AtEnd()); - if (table_->buckets_[bucket_idx_].hasDuplicates && node_->next != NULL) { + if (table_->stores_duplicates() && table_->buckets_[bucket_idx_].hasDuplicates && + node_->next != NULL) { node_ = node_->next; } else { table_->NextFilledBucket(&bucket_idx_, &node_); @@ -335,7 +340,8 @@ inline void HashTable::Iterator::Next() { inline void HashTable::Iterator::NextDuplicate() { DCHECK(!AtEnd()); - if (table_->buckets_[bucket_idx_].hasDuplicates && node_->next != NULL) { + if (table_->stores_duplicates() && table_->buckets_[bucket_idx_].hasDuplicates && + node_->next != NULL) { node_ = node_->next; } else { bucket_idx_ = BUCKET_NOT_FOUND; @@ -347,7 +353,7 @@ inline void HashTable::Iterator::NextUnmatched() { DCHECK(!AtEnd()); Bucket* bucket = &table_->buckets_[bucket_idx_]; // Check if there is any remaining unmatched duplicate node in the current bucket. - if (bucket->hasDuplicates) { + if (table_->stores_duplicates() && bucket->hasDuplicates) { while (node_->next != NULL) { node_ = node_->next; if (!node_->matched) return; @@ -358,7 +364,7 @@ inline void HashTable::Iterator::NextUnmatched() { table_->NextFilledBucket(&bucket_idx_, &node_); while (bucket_idx_ != Iterator::BUCKET_NOT_FOUND) { bucket = &table_->buckets_[bucket_idx_]; - if (!bucket->hasDuplicates) { + if (!table_->stores_duplicates() || !bucket->hasDuplicates) { if (!bucket->matched) return; } else { while (node_->matched && node_->next != NULL) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/265e39f8/be/src/exec/partitioned-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index 0a6a811..c2c884a 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -763,9 +763,9 @@ bool PartitionedAggregationNode::Partition::InitHashTable() { // though. Always start with small buffers. // TODO: How many buckets? We currently use a default value, 1024. static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024; - - hash_tbl.reset(HashTable::Create(parent->state_, parent->block_mgr_client_, 1, - NULL, 1L << (32 - NUM_PARTITIONING_BITS), PAGG_DEFAULT_HASH_TABLE_SZ)); + hash_tbl.reset(HashTable::Create(parent->state_, parent->block_mgr_client_, + false, 1, NULL, 1L << (32 - NUM_PARTITIONING_BITS), + PAGG_DEFAULT_HASH_TABLE_SZ)); return hash_tbl->Init(); } @@ -1841,6 +1841,16 @@ Status PartitionedAggregationNode::CodegenProcessBatch() { replaced = codegen->ReplaceCallSites(process_batch_fn, build_equals_fn, "Equals"); DCHECK_EQ(replaced, 1); + + HashTableCtx::HashTableReplacedConstants replaced_constants; + const bool stores_duplicates = false; + RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state_, stores_duplicates, 1, + process_batch_fn, &replaced_constants)); + DCHECK_GE(replaced_constants.stores_nulls, 1); + DCHECK_GE(replaced_constants.finds_some_nulls, 1); + DCHECK_GE(replaced_constants.stores_duplicates, 1); + DCHECK_GE(replaced_constants.stores_tuples, 1); + DCHECK_GE(replaced_constants.quadratic_probing, 1); } replaced = codegen->ReplaceCallSites(process_batch_fn, update_tuple_fn, "UpdateTuple"); @@ -1910,6 +1920,16 @@ Status PartitionedAggregationNode::CodegenProcessBatchStreaming() { replaced = codegen->ReplaceCallSites(process_batch_streaming_fn, equals_fn, "Equals"); DCHECK_EQ(replaced, 1); + HashTableCtx::HashTableReplacedConstants replaced_constants; + const bool stores_duplicates = false; + RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state_, stores_duplicates, 1, + process_batch_streaming_fn, &replaced_constants)); + DCHECK_GE(replaced_constants.stores_nulls, 1); + DCHECK_GE(replaced_constants.finds_some_nulls, 1); + DCHECK_GE(replaced_constants.stores_duplicates, 1); + DCHECK_GE(replaced_constants.stores_tuples, 1); + DCHECK_GE(replaced_constants.quadratic_probing, 1); + DCHECK(process_batch_streaming_fn != NULL); process_batch_streaming_fn = codegen->FinalizeFunction(process_batch_streaming_fn); if (process_batch_streaming_fn == NULL) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/265e39f8/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index 1888e06..6f29008 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -455,6 +455,7 @@ Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state, int64_t estimated_num_buckets = build_rows()->RowConsumesMemory() ? HashTable::EstimateNumBuckets(build_rows()->num_rows()) : state->batch_size() * 2; hash_tbl_.reset(HashTable::Create(state, parent_->block_mgr_client_, + true /* store_duplicates */, parent_->child(1)->row_desc().tuple_descriptors().size(), build_rows(), 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets)); if (!hash_tbl_->Init()) goto not_built; @@ -1660,6 +1661,18 @@ Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state, "EvalBuildRow"); DCHECK_EQ(replaced, 1); + // Replace some hash table parameters with constants. + HashTableCtx::HashTableReplacedConstants replaced_constants; + const bool stores_duplicates = true; + const int num_build_tuples = child(1)->row_desc().tuple_descriptors().size(); + RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state, stores_duplicates, + num_build_tuples, process_build_batch_fn, &replaced_constants)); + DCHECK_GE(replaced_constants.stores_nulls, 1); + DCHECK_EQ(replaced_constants.finds_some_nulls, 0); + DCHECK_EQ(replaced_constants.stores_duplicates, 0); + DCHECK_EQ(replaced_constants.stores_tuples, 0); + DCHECK_EQ(replaced_constants.quadratic_probing, 0); + Function* process_build_batch_fn_level0 = codegen->CloneFunction(process_build_batch_fn); @@ -1836,6 +1849,18 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch( // TODO: switch statement DCHECK(replaced == 1 || replaced == 2 || replaced == 3 || replaced == 4) << replaced; + // Replace hash-table parameters with constants. + HashTableCtx::HashTableReplacedConstants replaced_constants; + const bool stores_duplicates = true; + const int num_build_tuples = child(1)->row_desc().tuple_descriptors().size(); + RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state, stores_duplicates, + num_build_tuples, process_probe_batch_fn, &replaced_constants)); + DCHECK_GE(replaced_constants.stores_nulls, 1); + DCHECK_GE(replaced_constants.finds_some_nulls, 1); + DCHECK_GE(replaced_constants.stores_duplicates, 1); + DCHECK_GE(replaced_constants.stores_tuples, 1); + DCHECK_GE(replaced_constants.quadratic_probing, 1); + Function* process_probe_batch_fn_level0 = codegen->CloneFunction(process_probe_batch_fn); @@ -1895,6 +1920,18 @@ Status PartitionedHashJoinNode::CodegenInsertBatch(RuntimeState* state, replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals"); DCHECK_EQ(replaced, 1); + // Replace hash-table parameters with constants. + HashTableCtx::HashTableReplacedConstants replaced_constants; + const bool stores_duplicates = true; + const int num_build_tuples = child(1)->row_desc().tuple_descriptors().size(); + RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state, stores_duplicates, + num_build_tuples, insert_batch_fn, &replaced_constants)); + DCHECK_GE(replaced_constants.stores_nulls, 1); + DCHECK_EQ(replaced_constants.finds_some_nulls, 0); + DCHECK_GE(replaced_constants.stores_duplicates, 1); + DCHECK_GE(replaced_constants.stores_tuples, 1); + DCHECK_GE(replaced_constants.quadratic_probing, 1); + Function* insert_batch_fn_level0 = codegen->CloneFunction(insert_batch_fn); // Use codegen'd hash functions
