http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc deleted file mode 100644 index c35d05d..0000000 --- a/be/src/exec/hash-join-node.cc +++ /dev/null @@ -1,673 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/hash-join-node.h" - -#include <functional> -#include <numeric> -#include <sstream> - -#include "codegen/llvm-codegen.h" -#include "exec/old-hash-table.inline.h" -#include "exprs/scalar-expr.h" -#include "gutil/strings/substitute.h" -#include "runtime/mem-tracker.h" -#include "runtime/row-batch.h" -#include "runtime/runtime-filter.h" -#include "runtime/runtime-filter-bank.h" -#include "runtime/runtime-state.h" -#include "runtime/tuple-row.h" -#include "util/debug-util.h" -#include "util/bloom-filter.h" -#include "util/runtime-profile-counters.h" - -#include "gen-cpp/PlanNodes_types.h" - -#include "common/names.h" - -DEFINE_bool(enable_probe_side_filtering, true, "Deprecated."); - -using namespace impala; -using namespace llvm; -using namespace strings; - -const char* HashJoinNode::LLVM_CLASS_NAME = "class.impala::HashJoinNode"; - -HashJoinNode::HashJoinNode( - ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : BlockingJoinNode("HashJoinNode", tnode.hash_join_node.join_op, pool, tnode, descs), - is_not_distinct_from_(), - codegen_process_build_batch_fn_(NULL), - process_build_batch_fn_(NULL), - process_probe_batch_fn_(NULL) { - // The hash join node does not support cross or anti joins - DCHECK_NE(join_op_, TJoinOp::CROSS_JOIN); - DCHECK_NE(join_op_, TJoinOp::LEFT_ANTI_JOIN); - DCHECK_NE(join_op_, TJoinOp::RIGHT_SEMI_JOIN); - DCHECK_NE(join_op_, TJoinOp::RIGHT_ANTI_JOIN); - DCHECK_NE(join_op_, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN); - - match_all_probe_ = - join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN; - match_one_build_ = join_op_ == TJoinOp::LEFT_SEMI_JOIN; - match_all_build_ = - join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN; -} - -Status HashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(BlockingJoinNode::Init(tnode, state)); - DCHECK(tnode.__isset.hash_join_node); - const vector<TEqJoinCondition>& eq_join_conjuncts = - tnode.hash_join_node.eq_join_conjuncts; - - for (int i = 0; i < eq_join_conjuncts.size(); ++i) { - ScalarExpr* probe_expr; - RETURN_IF_ERROR(ScalarExpr::Create( - eq_join_conjuncts[i].left, *child(0)->row_desc(), state, &probe_expr)); - probe_exprs_.push_back(probe_expr); - ScalarExpr* build_expr; - RETURN_IF_ERROR(ScalarExpr::Create( - eq_join_conjuncts[i].right, *child(1)->row_desc(), state, &build_expr)); - build_exprs_.push_back(build_expr); - is_not_distinct_from_.push_back(eq_join_conjuncts[i].is_not_distinct_from); - } - - // other_join_conjunct_evals_ are evaluated in the context of rows assembled from - // all build and probe tuples; full_row_desc is not necessarily the same as the output - // row desc, e.g., because semi joins only return the build xor probe tuples - RowDescriptor full_row_desc(*child(0)->row_desc(), *child(1)->row_desc()); - RETURN_IF_ERROR(ScalarExpr::Create(tnode.hash_join_node.other_join_conjuncts, - full_row_desc, state, &other_join_conjuncts_)); - - for (const TRuntimeFilterDesc& tfilter: tnode.runtime_filters) { - // If filter propagation not enabled, only consider building broadcast joins (that may - // be consumed by this fragment). - if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL && - !tfilter.is_broadcast_join) { - continue; - } - if (state->query_options().disable_row_runtime_filtering && - !tfilter.applied_on_partition_columns) { - continue; - } - filters_.push_back(state->filter_bank()->RegisterFilter(tfilter, true)); - ScalarExpr* filter_expr; - RETURN_IF_ERROR( - ScalarExpr::Create(tfilter.src_expr, *child(1)->row_desc(), state, &filter_expr)); - filter_exprs_.push_back(filter_expr); - } - return Status::OK(); -} - -Status HashJoinNode::Prepare(RuntimeState* state) { - SCOPED_TIMER(runtime_profile_->total_time_counter()); - RETURN_IF_ERROR(BlockingJoinNode::Prepare(state)); - - build_buckets_counter_ = - ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT); - hash_tbl_load_factor_counter_ = - ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE); - - // build and probe exprs are evaluated in the context of the rows produced by our - // right and left children, respectively - RETURN_IF_ERROR(ScalarExprEvaluator::Create(other_join_conjuncts_, state, pool_, - expr_mem_pool(), &other_join_conjunct_evals_)); - AddEvaluatorsToFree(other_join_conjunct_evals_); - - // TODO: default buckets - const bool stores_nulls = join_op_ == TJoinOp::RIGHT_OUTER_JOIN - || join_op_ == TJoinOp::FULL_OUTER_JOIN - || std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(), - false, std::logical_or<bool>()); - - RETURN_IF_ERROR(OldHashTable::Create(pool_, state, build_exprs_, probe_exprs_, - filter_exprs_, child(1)->row_desc()->tuple_descriptors().size(), stores_nulls, - is_not_distinct_from_, state->fragment_hash_seed(), mem_tracker(), filters_, - &hash_tbl_)); - build_pool_.reset(new MemPool(mem_tracker())); - AddCodegenDisabledMessage(state); - return Status::OK(); -} - -void HashJoinNode::Codegen(RuntimeState* state) { - DCHECK(state->ShouldCodegen()); - ExecNode::Codegen(state); - if (IsNodeCodegenDisabled()) return; - - LlvmCodeGen* codegen = state->codegen(); - DCHECK(codegen != NULL); - bool build_codegen_enabled = false; - bool probe_codegen_enabled = false; - - // Codegen for hashing rows - Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(codegen); - if (hash_fn != NULL) { - // Codegen for build path - codegen_process_build_batch_fn_ = CodegenProcessBuildBatch(codegen, hash_fn); - if (codegen_process_build_batch_fn_ != NULL) { - codegen->AddFunctionToJit(codegen_process_build_batch_fn_, - reinterpret_cast<void**>(&process_build_batch_fn_)); - build_codegen_enabled = true; - } - - // Codegen for probe path (only for left joins) - if (!match_all_build_) { - Function* codegen_process_probe_batch_fn = - CodegenProcessProbeBatch(codegen, hash_fn); - if (codegen_process_probe_batch_fn != NULL) { - codegen->AddFunctionToJit(codegen_process_probe_batch_fn, - reinterpret_cast<void**>(&process_probe_batch_fn_)); - probe_codegen_enabled = true; - } - } - } - runtime_profile()->AddCodegenMsg(build_codegen_enabled, "", "Build Side"); - runtime_profile()->AddCodegenMsg(probe_codegen_enabled, "", "Probe Side"); -} - -Status HashJoinNode::Reset(RuntimeState* state) { - DCHECK(false) << "NYI"; - return Status("NYI"); -} - -void HashJoinNode::Close(RuntimeState* state) { - if (is_closed()) return; - if (hash_tbl_.get() != NULL) hash_tbl_->Close(state); - if (build_pool_.get() != NULL) build_pool_->FreeAll(); - ScalarExprEvaluator::Close(other_join_conjunct_evals_, state); - ScalarExpr::Close(probe_exprs_); - ScalarExpr::Close(build_exprs_); - ScalarExpr::Close(other_join_conjuncts_); - ScalarExpr::Close(filter_exprs_); - BlockingJoinNode::Close(state); -} - -Status HashJoinNode::Open(RuntimeState* state) { - SCOPED_TIMER(runtime_profile_->total_time_counter()); - RETURN_IF_ERROR(BlockingJoinNode::Open(state)); - RETURN_IF_ERROR(hash_tbl_->Open(state)); - RETURN_IF_ERROR(ScalarExprEvaluator::Open(other_join_conjunct_evals_, state)); - - // Check for errors and free local allocations before opening children. - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(QueryMaintenance(state)); - - RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, NULL)); - RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state)); - InitGetNext(); - return Status::OK(); -} - -Status HashJoinNode::QueryMaintenance(RuntimeState* state) { - if (hash_tbl_.get() != nullptr) hash_tbl_->FreeLocalAllocations(); - return ExecNode::QueryMaintenance(state); -} - -Status HashJoinNode::ProcessBuildInput(RuntimeState* state) { - // Do a full scan of child(1) and store everything in hash_tbl_ - // The hash join node needs to keep in memory all build tuples, including the tuple - // row ptrs. The row ptrs are copied into the hash table's internal structure so they - // don't need to be stored in the build_pool_. - RowBatch build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker()); - while (true) { - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(QueryMaintenance(state)); - bool eos; - { - SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_); - RETURN_IF_ERROR(child(1)->GetNext(state, &build_batch, &eos)); - } - SCOPED_TIMER(build_timer_); - // take ownership of tuple data of build_batch - build_pool_->AcquireData(build_batch.tuple_data_pool(), false); - RETURN_IF_ERROR(QueryMaintenance(state)); - - // Call codegen version if possible - if (process_build_batch_fn_ == NULL) { - ProcessBuildBatch(&build_batch); - } else { - process_build_batch_fn_(this, &build_batch); - } - VLOG_ROW << hash_tbl_->DebugString(true, false, child(1)->row_desc()); - - COUNTER_SET(build_row_counter_, hash_tbl_->size()); - COUNTER_SET(build_buckets_counter_, hash_tbl_->num_buckets()); - COUNTER_SET(hash_tbl_load_factor_counter_, hash_tbl_->load_factor()); - build_batch.Reset(); - DCHECK(!build_batch.AtCapacity()); - if (eos) break; - } - - if (filters_.size() > 0) { - int num_enabled_filters = hash_tbl_->AddBloomFilters(); - if (num_enabled_filters == filters_.size()) { - runtime_profile()->AppendExecOption( - Substitute("$0 of $0 Runtime Filter$1 Published", filters_.size(), - filters_.size() == 1 ? "" : "s")); - } else { - string exec_option = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled", - num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : "s", - filters_.size() - num_enabled_filters); - runtime_profile()->AppendExecOption(exec_option); - } - } - - return Status::OK(); -} - -void HashJoinNode::InitGetNext() { - if (current_probe_row_ == NULL) { - hash_tbl_iterator_ = hash_tbl_->Begin(); - } else { - matched_probe_ = false; - hash_tbl_iterator_ = hash_tbl_->Find(current_probe_row_); - } -} - -Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos) { - SCOPED_TIMER(runtime_profile_->total_time_counter()); - RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(QueryMaintenance(state)); - if (ReachedLimit()) { - *eos = true; - return Status::OK(); - } - *eos = false; - - // These cases are simpler and use a more efficient processing loop - if (!match_all_build_) { - if (eos_) { - *eos = true; - return Status::OK(); - } - return LeftJoinGetNext(state, out_batch, eos); - } - - const int num_other_conjuncts = other_join_conjuncts_.size(); - DCHECK_EQ(num_other_conjuncts, other_join_conjunct_evals_.size()); - - const int num_conjuncts = conjuncts_.size(); - DCHECK_EQ(num_conjuncts, conjunct_evals_.size()); - - // Explicitly manage the timer counter to avoid measuring time in the child - // GetNext call. - ScopedTimer<MonotonicStopWatch> probe_timer(probe_timer_); - - while (!eos_) { - // create output rows as long as: - // 1) we haven't already created an output row for the probe row and are doing - // a semi-join; - // 2) there are more matching build rows - while (!hash_tbl_iterator_.AtEnd()) { - int row_idx = out_batch->AddRow(); - TupleRow* out_row = out_batch->GetRow(row_idx); - - TupleRow* matched_build_row = hash_tbl_iterator_.GetRow(); - CreateOutputRow(out_row, current_probe_row_, matched_build_row); - if (!EvalConjuncts(other_join_conjunct_evals_.data(), - num_other_conjuncts, out_row)) { - hash_tbl_iterator_.Next<true>(); - continue; - } - // we have a match for the purpose of the (outer?) join as soon as we - // satisfy the JOIN clause conjuncts - matched_probe_ = true; - if (match_all_build_) { - // remember that we matched this build row - hash_tbl_iterator_.set_matched(true); - VLOG_ROW << "joined build row: " << matched_build_row; - } - - hash_tbl_iterator_.Next<true>(); - if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) { - out_batch->CommitLastRow(); - VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc()); - ++num_rows_returned_; - COUNTER_SET(rows_returned_counter_, num_rows_returned_); - if (out_batch->AtCapacity() || ReachedLimit()) { - *eos = ReachedLimit(); - return Status::OK(); - } - } - } - - // If a probe row exists at this point, check whether we need to output the current - // probe row before getting a new probe batch. (IMPALA-2440) - bool probe_row_exists = probe_batch_->num_rows() > 0; - if (match_all_probe_ && !matched_probe_ && probe_row_exists) { - int row_idx = out_batch->AddRow(); - TupleRow* out_row = out_batch->GetRow(row_idx); - CreateOutputRow(out_row, current_probe_row_, NULL); - if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) { - out_batch->CommitLastRow(); - VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc()); - ++num_rows_returned_; - COUNTER_SET(rows_returned_counter_, num_rows_returned_); - matched_probe_ = true; - if (out_batch->AtCapacity() || ReachedLimit()) { - *eos = ReachedLimit(); - return Status::OK(); - } - } - } - - if (probe_batch_pos_ == probe_batch_->num_rows()) { - // pass on resources, out_batch might still need them - probe_batch_->TransferResourceOwnership(out_batch); - probe_batch_pos_ = 0; - if (out_batch->AtCapacity()) return Status::OK(); - // get new probe batch - if (!probe_side_eos_) { - while (true) { - probe_timer.Stop(); - RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_side_eos_)); - probe_timer.Start(); - if (probe_batch_->num_rows() == 0) { - // Empty batches can still contain IO buffers, which need to be passed up to - // the caller; transferring resources can fill up out_batch. - probe_batch_->TransferResourceOwnership(out_batch); - if (probe_side_eos_) { - eos_ = true; - break; - } - if (out_batch->AtCapacity()) return Status::OK(); - continue; - } else { - COUNTER_ADD(probe_row_counter_, probe_batch_->num_rows()); - break; - } - } - } else { - eos_ = true; - } - // finish up right outer join - if (eos_ && match_all_build_) { - hash_tbl_iterator_ = hash_tbl_->Begin(); - } - } - - if (eos_) break; - - // join remaining rows in probe batch_ - current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++); - VLOG_ROW << "probe row: " << GetLeftChildRowString(current_probe_row_); - matched_probe_ = false; - hash_tbl_iterator_ = hash_tbl_->Find(current_probe_row_); - } - - *eos = true; - if (match_all_build_) { - // output remaining unmatched build rows - TupleRow* build_row = NULL; - while (!out_batch->AtCapacity() && !hash_tbl_iterator_.AtEnd()) { - build_row = hash_tbl_iterator_.GetRow(); - bool matched = hash_tbl_iterator_.matched(); - hash_tbl_iterator_.Next<false>(); - if (matched) continue; - - int row_idx = out_batch->AddRow(); - TupleRow* out_row = out_batch->GetRow(row_idx); - CreateOutputRow(out_row, NULL, build_row); - if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) { - out_batch->CommitLastRow(); - VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc()); - ++num_rows_returned_; - COUNTER_SET(rows_returned_counter_, num_rows_returned_); - if (ReachedLimit()) { - *eos = true; - return Status::OK(); - } - } - } - // we're done if there are no more rows left to check - *eos = hash_tbl_iterator_.AtEnd(); - } - return Status::OK(); -} - -Status HashJoinNode::LeftJoinGetNext(RuntimeState* state, - RowBatch* out_batch, bool* eos) { - *eos = eos_; - - ScopedTimer<MonotonicStopWatch> probe_timer(probe_timer_); - while (!eos_) { - // Compute max rows that should be added to out_batch - int64_t max_added_rows = out_batch->capacity() - out_batch->num_rows(); - if (limit() != -1) max_added_rows = min(max_added_rows, limit() - rows_returned()); - - // Continue processing this row batch - if (process_probe_batch_fn_ == NULL) { - num_rows_returned_ += - ProcessProbeBatch(out_batch, probe_batch_.get(), max_added_rows); - } else { - // Use codegen'd function - num_rows_returned_ += - process_probe_batch_fn_(this, out_batch, probe_batch_.get(), max_added_rows); - } - COUNTER_SET(rows_returned_counter_, num_rows_returned_); - - if (ReachedLimit() || out_batch->AtCapacity()) { - *eos = ReachedLimit(); - break; - } - - // Check to see if we're done processing the current probe batch - if (hash_tbl_iterator_.AtEnd() && probe_batch_pos_ == probe_batch_->num_rows()) { - probe_batch_->TransferResourceOwnership(out_batch); - probe_batch_pos_ = 0; - if (out_batch->AtCapacity()) break; - if (probe_side_eos_) { - *eos = eos_ = true; - break; - } else { - probe_timer.Stop(); - RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_side_eos_)); - probe_timer.Start(); - COUNTER_ADD(probe_row_counter_, probe_batch_->num_rows()); - } - } - } - - return Status::OK(); -} - -void HashJoinNode::AddToDebugString(int indentation_level, stringstream* out) const { - *out << " hash_tbl="; - *out << string(indentation_level * 2, ' '); - *out << "HashTbl(" - << " build_exprs=" << ScalarExpr::DebugString(build_exprs_) - << " probe_exprs=" << ScalarExpr::DebugString(probe_exprs_); - *out << ")"; -} - -// This codegen'd function should only be used for left join cases so it assumes that -// the probe row is non-null. For a left outer join, the IR looks like: -// define void @CreateOutputRow(%"class.impala::BlockingJoinNode"* %this_ptr, -// %"class.impala::TupleRow"* %out_arg, -// %"class.impala::TupleRow"* %probe_arg, -// %"class.impala::TupleRow"* %build_arg) { -// entry: -// %out = bitcast %"class.impala::TupleRow"* %out_arg to i8** -// %probe = bitcast %"class.impala::TupleRow"* %probe_arg to i8** -// %build = bitcast %"class.impala::TupleRow"* %build_arg to i8** -// %0 = bitcast i8** %out to i8* -// %1 = bitcast i8** %probe to i8* -// call void @llvm.memcpy.p0i8.p0i8.i32(i8* %0, i8* %1, i32 16, i32 16, i1 false) -// %is_build_null = icmp eq i8** %build, null -// br i1 %is_build_null, label %build_null, label %build_not_null -// -// build_not_null: ; preds = %entry -// %dst_tuple_ptr1 = getelementptr i8** %out, i32 1 -// %src_tuple_ptr = getelementptr i8** %build, i32 0 -// %2 = load i8** %src_tuple_ptr -// store i8* %2, i8** %dst_tuple_ptr1 -// ret void -// -// build_null: ; preds = %entry -// %dst_tuple_ptr = getelementptr i8** %out, i32 1 -// call void @llvm.memcpy.p0i8.p0i8.i32( -// i8* %dst_tuple_ptr, i8* %1, i32 16, i32 16, i1 false) -// ret void -// } -Function* HashJoinNode::CodegenCreateOutputRow(LlvmCodeGen* codegen) { - Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME); - DCHECK(tuple_row_type != NULL); - PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0); - - Type* this_type = codegen->GetType(BlockingJoinNode::LLVM_CLASS_NAME); - DCHECK(this_type != NULL); - PointerType* this_ptr_type = PointerType::get(this_type, 0); - - // TupleRows are really just an array of pointers. Easier to work with them - // this way. - PointerType* tuple_row_working_type = PointerType::get(codegen->ptr_type(), 0); - - // Construct function signature to match CreateOutputRow() - LlvmCodeGen::FnPrototype prototype(codegen, "CreateOutputRow", codegen->void_type()); - prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type)); - prototype.AddArgument(LlvmCodeGen::NamedVariable("out_arg", tuple_row_ptr_type)); - prototype.AddArgument(LlvmCodeGen::NamedVariable("probe_arg", tuple_row_ptr_type)); - prototype.AddArgument(LlvmCodeGen::NamedVariable("build_arg", tuple_row_ptr_type)); - - LLVMContext& context = codegen->context(); - LlvmBuilder builder(context); - Value* args[4]; - Function* fn = prototype.GeneratePrototype(&builder, args); - Value* out_row_arg = builder.CreateBitCast(args[1], tuple_row_working_type, "out"); - Value* probe_row_arg = builder.CreateBitCast(args[2], tuple_row_working_type, "probe"); - Value* build_row_arg = builder.CreateBitCast(args[3], tuple_row_working_type, "build"); - - int num_probe_tuples = child(0)->row_desc()->tuple_descriptors().size(); - int num_build_tuples = child(1)->row_desc()->tuple_descriptors().size(); - - // Copy probe row - codegen->CodegenMemcpy(&builder, out_row_arg, probe_row_arg, probe_tuple_row_size_); - Value* build_row_idx[] = {codegen->GetIntConstant(TYPE_INT, num_probe_tuples)}; - Value* build_row_dst = - builder.CreateInBoundsGEP(out_row_arg, build_row_idx, "build_dst_ptr"); - - // Copy build row. - BasicBlock* build_not_null_block = BasicBlock::Create(context, "build_not_null", fn); - BasicBlock* build_null_block = NULL; - - if (match_all_probe_) { - // build tuple can be null - build_null_block = BasicBlock::Create(context, "build_null", fn); - Value* is_build_null = builder.CreateIsNull(build_row_arg, "is_build_null"); - builder.CreateCondBr(is_build_null, build_null_block, build_not_null_block); - - // Set tuple build ptrs to NULL - // TODO: this should be replaced with memset() but I can't get the llvm intrinsic - // to work. - builder.SetInsertPoint(build_null_block); - for (int i = 0; i < num_build_tuples; ++i) { - Value* array_idx[] = {codegen->GetIntConstant(TYPE_INT, i + num_probe_tuples)}; - Value* dst = builder.CreateInBoundsGEP(out_row_arg, array_idx, "dst_tuple_ptr"); - builder.CreateStore(codegen->null_ptr_value(), dst); - } - builder.CreateRetVoid(); - } else { - // build row can't be NULL - builder.CreateBr(build_not_null_block); - } - - // Copy build tuple ptrs - builder.SetInsertPoint(build_not_null_block); - codegen->CodegenMemcpy(&builder, build_row_dst, build_row_arg, build_tuple_row_size_); - builder.CreateRetVoid(); - - return codegen->FinalizeFunction(fn); -} - -Function* HashJoinNode::CodegenProcessBuildBatch(LlvmCodeGen* codegen, - Function* hash_fn) { - // Get cross compiled function - Function* process_build_batch_fn = - codegen->GetFunction(IRFunction::HASH_JOIN_PROCESS_BUILD_BATCH, true); - DCHECK(process_build_batch_fn != NULL); - - // Codegen for evaluating build rows - Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, true); - if (eval_row_fn == NULL) return NULL; - - int replaced = codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn, - "EvalBuildRow"); - DCHECK_EQ(replaced, 1); - - replaced = codegen->ReplaceCallSites(process_build_batch_fn, hash_fn, "HashCurrentRow"); - DCHECK_EQ(replaced, 1); - - return codegen->FinalizeFunction(process_build_batch_fn); -} - -Function* HashJoinNode::CodegenProcessProbeBatch(LlvmCodeGen* codegen, - Function* hash_fn) { - // Get cross compiled function - Function* process_probe_batch_fn = - codegen->GetFunction(IRFunction::HASH_JOIN_PROCESS_PROBE_BATCH, true); - DCHECK(process_probe_batch_fn != NULL); - - // Codegen HashTable::Equals() - Function* equals_fn = hash_tbl_->CodegenEquals(codegen); - if (equals_fn == NULL) return NULL; - - // Codegen for evaluating build rows - Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, false); - if (eval_row_fn == NULL) return NULL; - - // Codegen CreateOutputRow() - Function* create_output_row_fn = CodegenCreateOutputRow(codegen); - if (create_output_row_fn == NULL) return NULL; - - // Codegen evaluating other join conjuncts - Function* eval_other_conjuncts_fn; - Status status = ExecNode::CodegenEvalConjuncts(codegen, other_join_conjuncts_, - &eval_other_conjuncts_fn, "EvalOtherConjuncts"); - if (!status.ok()) return NULL; - - // Codegen evaluating conjuncts - Function* eval_conjuncts_fn; - status = ExecNode::CodegenEvalConjuncts(codegen, conjuncts_, &eval_conjuncts_fn); - if (!status.ok()) return NULL; - - // Replace all call sites with codegen version - int replaced = codegen->ReplaceCallSites(process_probe_batch_fn, hash_fn, - "HashCurrentRow"); - DCHECK_EQ(replaced, 1); - - replaced = codegen->ReplaceCallSites(process_probe_batch_fn, eval_row_fn, - "EvalProbeRow"); - DCHECK_EQ(replaced, 1); - - replaced = codegen->ReplaceCallSites(process_probe_batch_fn, create_output_row_fn, - "CreateOutputRow"); - DCHECK_EQ(replaced, 3); - - replaced = codegen->ReplaceCallSites(process_probe_batch_fn, eval_conjuncts_fn, - "EvalConjuncts"); - DCHECK_EQ(replaced, 2); - - replaced = codegen->ReplaceCallSites(process_probe_batch_fn, eval_other_conjuncts_fn, - "EvalOtherJoinConjuncts"); - DCHECK_EQ(replaced, 2); - - replaced = codegen->ReplaceCallSites(process_probe_batch_fn, equals_fn, "Equals"); - DCHECK_EQ(replaced, 2); - - return codegen->FinalizeFunction(process_probe_batch_fn); -}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-join-node.h b/be/src/exec/hash-join-node.h deleted file mode 100644 index b49f8bb..0000000 --- a/be/src/exec/hash-join-node.h +++ /dev/null @@ -1,164 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - - -#ifndef IMPALA_EXEC_HASH_JOIN_NODE_H -#define IMPALA_EXEC_HASH_JOIN_NODE_H - -#include <boost/scoped_ptr.hpp> -#include <boost/thread.hpp> -#include <string> - -#include "exec/exec-node.h" -#include "exec/old-hash-table.h" -#include "exec/blocking-join-node.h" -#include "exprs/scalar-expr.h" -#include "exprs/scalar-expr-evaluator.h" -#include "util/promise.h" - -#include "gen-cpp/PlanNodes_types.h" // for TJoinOp - -namespace impala { - -class MemPool; -class RowBatch; -class ScalarExpr; -class ScalarExprEvaluator; -class TupleRow; - -/// Node for in-memory hash joins: -/// - builds up a hash table with the rows produced by our right input -/// (child(1)); build exprs are the rhs exprs of our equi-join predicates -/// - for each row from our left input, probes the hash table to retrieve -/// matching entries; the probe exprs are the lhs exprs of our equi-join predicates -// -/// Row batches: -/// - In general, we are not able to pass our output row batch on to our left child (when -/// we're fetching the probe rows): if we have a 1xn join, our output will contain -/// multiple rows per left input row -/// - TODO: fix this, so in the case of 1x1/nx1 joins (for instance, fact to dimension tbl) -/// we don't do these extra copies -class HashJoinNode : public BlockingJoinNode { - public: - HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - - virtual Status Init(const TPlanNode& tnode, RuntimeState* state); - virtual Status Prepare(RuntimeState* state); - virtual void Codegen(RuntimeState* state); - virtual Status Open(RuntimeState* state); - virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos); - virtual Status Reset(RuntimeState* state); - virtual void Close(RuntimeState* state); - - static const char* LLVM_CLASS_NAME; - - protected: - virtual Status QueryMaintenance(RuntimeState* state); - virtual void AddToDebugString(int indentation_level, std::stringstream* out) const; - virtual Status ProcessBuildInput(RuntimeState* state); - - private: - boost::scoped_ptr<OldHashTable> hash_tbl_; - OldHashTable::Iterator hash_tbl_iterator_; - - /// holds everything referenced from build side - boost::scoped_ptr<MemPool> build_pool_; - - /// our equi-join predicates "<lhs> = <rhs>" are separated into - /// build_exprs_ (over child(1)) and probe_exprs_ (over child(0)) - std::vector<ScalarExpr*> probe_exprs_; - std::vector<ScalarExpr*> build_exprs_; - - /// Expressions used to build runtime filters, one per entry in filters_. - std::vector<ScalarExpr*> filter_exprs_; - - /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS - /// NOT DISTINCT FROM, rather than equality. - std::vector<bool> is_not_distinct_from_; - - /// non-equi-join conjuncts from the JOIN clause - std::vector<ScalarExpr*> other_join_conjuncts_; - std::vector<ScalarExprEvaluator*> other_join_conjunct_evals_; - - /// Derived from join_op_ - /// Output all rows coming from the probe input. Used in LEFT_OUTER_JOIN and - /// FULL_OUTER_JOIN. - bool match_all_probe_; - - /// Match at most one build row to each probe row. Used in LEFT_SEMI_JOIN. - bool match_one_build_; - - /// Output all rows coming from the build input. Used in RIGHT_OUTER_JOIN and - /// FULL_OUTER_JOIN. - bool match_all_build_; - - /// llvm function for build batch - llvm::Function* codegen_process_build_batch_fn_; - - /// Function declaration for codegen'd function. Signature must match - /// HashJoinNode::ProcessBuildBatch - typedef void (*ProcessBuildBatchFn)(HashJoinNode*, RowBatch*); - ProcessBuildBatchFn process_build_batch_fn_; - - /// HashJoinNode::ProcessProbeBatch() exactly - typedef int (*ProcessProbeBatchFn)(HashJoinNode*, RowBatch*, RowBatch*, int); - /// Jitted ProcessProbeBatch function pointer. Null if codegen is disabled. - ProcessProbeBatchFn process_probe_batch_fn_; - - /// RuntimeFilters to build. - std::vector<RuntimeFilter*> filters_; - - RuntimeProfile::Counter* build_buckets_counter_; // num buckets in hash table - RuntimeProfile::Counter* hash_tbl_load_factor_counter_; - - /// Prepares for the first call to GetNext(). Must be called after GetFirstProbeRow(). - void InitGetNext(); - - /// GetNext helper function for the common join cases: Inner join, left semi and left - /// outer - Status LeftJoinGetNext(RuntimeState* state, RowBatch* row_batch, bool* eos); - - /// Processes a probe batch for the common (non right-outer join) cases. - /// out_batch: the batch for resulting tuple rows - /// probe_batch: the probe batch to process. This function can be called to - /// continue processing a batch in the middle - /// max_added_rows: maximum rows that can be added to out_batch - /// return the number of rows added to out_batch - int ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch, int max_added_rows); - - /// Construct the build hash table, adding all the rows in 'build_batch' - void ProcessBuildBatch(RowBatch* build_batch); - - /// Codegen function to create output row - llvm::Function* CodegenCreateOutputRow(LlvmCodeGen* codegen); - - /// Codegen processing build batches. Identical signature to ProcessBuildBatch. - /// hash_fn is the codegen'd function for computing hashes over tuple rows in the - /// hash table. - /// Returns NULL if codegen was not possible. - llvm::Function* CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn); - - /// Codegen processing probe batches. Identical signature to ProcessProbeBatch. - /// hash_fn is the codegen'd function for computing hashes over tuple rows in the - /// hash table. - /// Returns NULL if codegen was not possible. - llvm::Function* CodegenProcessProbeBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn); -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 63298cb..e97572c 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -54,8 +54,6 @@ using namespace llvm; DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the percentage of " "rows rejected by a runtime filter drops below this value, the filter is disabled."); -DECLARE_bool(enable_partitioned_aggregation); -DECLARE_bool(enable_partitioned_hash_join); // The number of row batches between checks to see if a filter is effective, and // should be disabled. Must be a power of two. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/nested-loop-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc index 01a7f19..d377a22 100644 --- a/be/src/exec/nested-loop-join-node.cc +++ b/be/src/exec/nested-loop-join-node.cc @@ -186,11 +186,6 @@ Status NestedLoopJoinNode::ResetMatchingBuildRows(RuntimeState* state, int64_t n return Status::OK(); } -Status NestedLoopJoinNode::ProcessBuildInput(RuntimeState* state) { - DCHECK(false) << "Should not be called, NLJ uses the BuildSink API"; - return Status::OK(); -} - void NestedLoopJoinNode::ResetForProbe() { DCHECK(build_batches_ != NULL); build_row_iterator_ = build_batches_->Iterator(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/nested-loop-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h index 94f1dae..c94abbf 100644 --- a/be/src/exec/nested-loop-join-node.h +++ b/be/src/exec/nested-loop-join-node.h @@ -51,9 +51,6 @@ class NestedLoopJoinNode : public BlockingJoinNode { virtual Status Reset(RuntimeState* state); virtual void Close(RuntimeState* state); - protected: - virtual Status ProcessBuildInput(RuntimeState* state); - private: ///////////////////////////////////////// /// BEGIN: Members that must be Reset() http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/old-hash-table-ir.cc b/be/src/exec/old-hash-table-ir.cc deleted file mode 100644 index 2436ef1..0000000 --- a/be/src/exec/old-hash-table-ir.cc +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifdef IR_COMPILE - -#include "exec/old-hash-table.h" - -namespace impala { - -uint8_t* OldHashTable::expr_values_buffer() const { - return expr_values_buffer_; -} - -uint8_t* OldHashTable::expr_value_null_bits() const { - return expr_value_null_bits_; -} - -ScalarExprEvaluator* const* OldHashTable::build_expr_evals() const { - return build_expr_evals_.data(); -} - -ScalarExprEvaluator* const* OldHashTable::probe_expr_evals() const { - return probe_expr_evals_.data(); -} - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/old-hash-table-test.cc b/be/src/exec/old-hash-table-test.cc deleted file mode 100644 index e873791..0000000 --- a/be/src/exec/old-hash-table-test.cc +++ /dev/null @@ -1,337 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <stdlib.h> -#include <stdio.h> -#include <iostream> -#include <vector> - -#include "common/compiler-util.h" -#include "exec/old-hash-table.inline.h" -#include "exprs/scalar-expr.h" -#include "exprs/scalar-expr-evaluator.h" -#include "exprs/slot-ref.h" -#include "runtime/mem-pool.h" -#include "runtime/mem-tracker.h" -#include "runtime/string-value.h" -#include "runtime/tuple-row.h" -#include "testutil/gtest-util.h" -#include "util/cpu-info.h" -#include "util/runtime-profile-counters.h" - -#include "common/names.h" - -namespace impala { - -class OldHashTableTest : public testing::Test { - public: - OldHashTableTest() : mem_pool_(&tracker_) {} - - protected: - ObjectPool pool_; - MemTracker tracker_; - MemPool mem_pool_; - - vector<ScalarExpr*> build_exprs_; - vector<ScalarExprEvaluator*> build_expr_evals_; - vector<ScalarExpr*> probe_exprs_; - vector<ScalarExprEvaluator*> probe_expr_evals_; - - virtual void SetUp() { - RowDescriptor desc; - // Not very easy to test complex tuple layouts so this test will use the - // simplest. The purpose of these tests is to exercise the hash map - // internals so a simple build/probe expr is fine. - ScalarExpr* build_expr = pool_.Add(new SlotRef(TYPE_INT, 0)); - ASSERT_OK(build_expr->Init(desc, nullptr)); - build_exprs_.push_back(build_expr); - ASSERT_OK(ScalarExprEvaluator::Create(build_exprs_, nullptr, &pool_, &mem_pool_, - &build_expr_evals_)); - ASSERT_OK(ScalarExprEvaluator::Open(build_expr_evals_, nullptr)); - - ScalarExpr* probe_expr = pool_.Add(new SlotRef(TYPE_INT, 0)); - ASSERT_OK(probe_expr->Init(desc, nullptr)); - probe_exprs_.push_back(probe_expr); - ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, &mem_pool_, - &probe_expr_evals_)); - ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr)); - } - - virtual void TearDown() { - ScalarExprEvaluator::Close(build_expr_evals_, nullptr); - ScalarExprEvaluator::Close(probe_expr_evals_, nullptr); - ScalarExpr::Close(build_exprs_); - ScalarExpr::Close(probe_exprs_); - } - - TupleRow* CreateTupleRow(int32_t val) { - uint8_t* tuple_row_mem = mem_pool_.Allocate(sizeof(int32_t*)); - Tuple* tuple_mem = Tuple::Create(sizeof(int32_t), &mem_pool_); - *reinterpret_cast<int32_t*>(tuple_mem) = val; - TupleRow* row = reinterpret_cast<TupleRow*>(tuple_row_mem); - row->SetTuple(0, tuple_mem); - return row; - } - - // Wrapper to call private methods on OldHashTable - // TODO: understand google testing, there must be a more natural way to do this - void ResizeTable(OldHashTable* table, int64_t new_size) { - table->ResizeBuckets(new_size); - } - - // Do a full table scan on table. All values should be between [min,max). If - // all_unique, then each key(int value) should only appear once. Results are - // stored in results, indexed by the key. Results must have been preallocated to - // be at least max size. - void FullScan(OldHashTable* table, int min, int max, bool all_unique, - TupleRow** results, TupleRow** expected) { - OldHashTable::Iterator iter = table->Begin(); - while (iter != table->End()) { - TupleRow* row = iter.GetRow(); - int32_t val = *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(row)); - EXPECT_GE(val, min); - EXPECT_LT(val, max); - if (all_unique) EXPECT_TRUE(results[val] == nullptr); - EXPECT_EQ(row->GetTuple(0), expected[val]->GetTuple(0)); - results[val] = row; - iter.Next<false>(); - } - } - - // Validate that probe_row evaluates overs probe_exprs is equal to build_row - // evaluated over build_exprs - void ValidateMatch(TupleRow* probe_row, TupleRow* build_row) { - EXPECT_TRUE(probe_row != build_row); - int32_t build_val = - *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(probe_row)); - int32_t probe_val = - *reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(build_row)); - EXPECT_EQ(build_val, probe_val); - } - - struct ProbeTestData { - TupleRow* probe_row; - vector<TupleRow*> expected_build_rows; - }; - - void ProbeTest(OldHashTable* table, ProbeTestData* data, int num_data, bool scan) { - for (int i = 0; i < num_data; ++i) { - TupleRow* row = data[i].probe_row; - - OldHashTable::Iterator iter; - iter = table->Find(row); - - if (data[i].expected_build_rows.size() == 0) { - EXPECT_TRUE(iter == table->End()); - } else { - if (scan) { - map<TupleRow*, bool> matched; - while (iter != table->End()) { - EXPECT_TRUE(matched.find(iter.GetRow()) == matched.end()); - matched[iter.GetRow()] = true; - iter.Next<true>(); - } - EXPECT_EQ(matched.size(), data[i].expected_build_rows.size()); - for (int j = 0; i < data[j].expected_build_rows.size(); ++j) { - EXPECT_TRUE(matched[data[i].expected_build_rows[j]]); - } - } else { - EXPECT_EQ(data[i].expected_build_rows.size(), 1); - EXPECT_TRUE( - data[i].expected_build_rows[0]->GetTuple(0) == iter.GetRow()->GetTuple(0)); - ValidateMatch(row, iter.GetRow()); - } - } - } - } -}; - -TEST_F(OldHashTableTest, SetupTest) { - TupleRow* build_row1 = CreateTupleRow(1); - TupleRow* build_row2 = CreateTupleRow(2); - TupleRow* probe_row3 = CreateTupleRow(3); - TupleRow* probe_row4 = CreateTupleRow(4); - - int32_t* val_row1 = - reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row1)); - EXPECT_EQ(*val_row1, 1); - int32_t* val_row2 = - reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row2)); - EXPECT_EQ(*val_row2, 2); - int32_t* val_row3 = - reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row3)); - EXPECT_EQ(*val_row3, 3); - int32_t* val_row4 = - reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row4)); - EXPECT_EQ(*val_row4, 4); - - mem_pool_.FreeAll(); -} - -// This tests inserts the build rows [0->5) to hash table. It validates that they -// are all there using a full table scan. It also validates that Find() is correct -// testing for probe rows that are both there and not. -// The hash table is rehashed a few times and the scans/finds are tested again. -TEST_F(OldHashTableTest, BasicTest) { - TupleRow* build_rows[5]; - TupleRow* scan_rows[5] = {0}; - for (int i = 0; i < 5; ++i) { - build_rows[i] = CreateTupleRow(i); - } - - ProbeTestData probe_rows[10]; - for (int i = 0; i < 10; ++i) { - probe_rows[i].probe_row = CreateTupleRow(i); - if (i < 5) { - probe_rows[i].expected_build_rows.push_back(build_rows[i]); - } - } - - // Create the hash table and insert the build rows - MemTracker tracker; - scoped_ptr<OldHashTable> hash_table; - EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_, - vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false), - 0, &tracker, vector<RuntimeFilter*>(), &hash_table)); - EXPECT_OK(hash_table->Open(nullptr)); - for (int i = 0; i < 5; ++i) { - hash_table->Insert(build_rows[i]); - } - EXPECT_EQ(hash_table->size(), 5); - - // Do a full table scan and validate returned pointers - FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), probe_rows, 10, false); - - // Resize and scan again - ResizeTable(hash_table.get(), 64); - EXPECT_EQ(hash_table->num_buckets(), 64); - EXPECT_EQ(hash_table->size(), 5); - memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), probe_rows, 10, false); - - // Resize to two and cause some collisions - ResizeTable(hash_table.get(), 2); - EXPECT_EQ(hash_table->num_buckets(), 2); - EXPECT_EQ(hash_table->size(), 5); - memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), probe_rows, 10, false); - - // Resize to one and turn it into a linked list - ResizeTable(hash_table.get(), 1); - EXPECT_EQ(hash_table->num_buckets(), 1); - EXPECT_EQ(hash_table->size(), 5); - memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows); - ProbeTest(hash_table.get(), probe_rows, 10, false); - - hash_table->Close(nullptr); - mem_pool_.FreeAll(); -} - -// This tests makes sure we can scan ranges of buckets -TEST_F(OldHashTableTest, ScanTest) { - MemTracker tracker; - scoped_ptr<OldHashTable> hash_table; - EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_, - vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false), - 0, &tracker, vector<RuntimeFilter*>(), &hash_table)); - EXPECT_OK(hash_table->Open(nullptr)); - // Add 1 row with val 1, 2 with val 2, etc - vector<TupleRow*> build_rows; - ProbeTestData probe_rows[15]; - probe_rows[0].probe_row = CreateTupleRow(0); - for (int val = 1; val <= 10; ++val) { - probe_rows[val].probe_row = CreateTupleRow(val); - for (int i = 0; i < val; ++i) { - TupleRow* row = CreateTupleRow(val); - hash_table->Insert(row); - build_rows.push_back(row); - probe_rows[val].expected_build_rows.push_back(row); - } - } - - // Add some more probe rows that aren't there - for (int val = 11; val < 15; ++val) { - probe_rows[val].probe_row = CreateTupleRow(val); - } - - // Test that all the builds were found - ProbeTest(hash_table.get(), probe_rows, 15, true); - - // Resize and try again - ResizeTable(hash_table.get(), 128); - EXPECT_EQ(hash_table->num_buckets(), 128); - ProbeTest(hash_table.get(), probe_rows, 15, true); - - ResizeTable(hash_table.get(), 16); - EXPECT_EQ(hash_table->num_buckets(), 16); - ProbeTest(hash_table.get(), probe_rows, 15, true); - - ResizeTable(hash_table.get(), 2); - EXPECT_EQ(hash_table->num_buckets(), 2); - ProbeTest(hash_table.get(), probe_rows, 15, true); - - hash_table->Close(nullptr); - mem_pool_.FreeAll(); -} - -// This test continues adding to the hash table to trigger the resize code paths -TEST_F(OldHashTableTest, GrowTableTest) { - int num_to_add = 4; - int expected_size = 0; - MemTracker tracker(100 * 1024 * 1024); - scoped_ptr<OldHashTable> hash_table; - EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_, - vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false), - 0, &tracker, vector<RuntimeFilter*>(), &hash_table, false, num_to_add)); - EXPECT_OK(hash_table->Open(nullptr)); - EXPECT_FALSE(hash_table->mem_limit_exceeded()); - EXPECT_TRUE(!tracker.LimitExceeded()); - - // This inserts about 5M entries - int build_row_val = 0; - for (int i = 0; i < 20; ++i) { - for (int j = 0; j < num_to_add; ++build_row_val, ++j) { - hash_table->Insert(CreateTupleRow(build_row_val)); - } - expected_size += num_to_add; - num_to_add *= 2; - } - EXPECT_TRUE(hash_table->mem_limit_exceeded()); - EXPECT_TRUE(tracker.LimitExceeded()); - - // Validate that we can find the entries before we went over the limit - for (int i = 0; i < expected_size * 5; i += 100000) { - TupleRow* probe_row = CreateTupleRow(i); - OldHashTable::Iterator iter = hash_table->Find(probe_row); - if (i < hash_table->size()) { - EXPECT_TRUE(iter != hash_table->End()); - ValidateMatch(probe_row, iter.GetRow()); - } else { - EXPECT_TRUE(iter == hash_table->End()); - } - } - hash_table->Close(nullptr); - mem_pool_.FreeAll(); -} - -} - -IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/old-hash-table.cc b/be/src/exec/old-hash-table.cc deleted file mode 100644 index 9105226..0000000 --- a/be/src/exec/old-hash-table.cc +++ /dev/null @@ -1,872 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/old-hash-table.inline.h" - -#include <functional> -#include <numeric> - -#include "codegen/codegen-anyval.h" -#include "codegen/llvm-codegen.h" -#include "exprs/scalar-expr.h" -#include "exprs/scalar-expr-evaluator.h" -#include "exprs/slot-ref.h" -#include "runtime/mem-tracker.h" -#include "runtime/raw-value.inline.h" -#include "runtime/runtime-filter.h" -#include "runtime/runtime-filter-bank.h" -#include "runtime/runtime-state.h" -#include "runtime/string-value.inline.h" -#include "runtime/tuple-row.h" -#include "util/bloom-filter.h" -#include "runtime/tuple.h" -#include "util/debug-util.h" -#include "util/error-util.h" -#include "util/impalad-metrics.h" - -#include "common/names.h" - -using namespace impala; -using namespace llvm; - -const char* OldHashTable::LLVM_CLASS_NAME = "class.impala::OldHashTable"; - -const float OldHashTable::MAX_BUCKET_OCCUPANCY_FRACTION = 0.75f; -static const int HT_PAGE_SIZE = 8 * 1024 * 1024; - -// Put a non-zero constant in the result location for NULL. -// We don't want(NULL, 1) to hash to the same as (0, 1). -// This needs to be as big as the biggest primitive type since the bytes -// get copied directly. -// TODO find a better approach, since primitives like CHAR(N) can be up to 128 bytes -static int64_t NULL_VALUE[] = { HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED, - HashUtil::FNV_SEED, HashUtil::FNV_SEED }; - -OldHashTable::OldHashTable(RuntimeState* state, - const vector<ScalarExpr*>& build_exprs, const vector<ScalarExpr*>& probe_exprs, - const vector<ScalarExpr*>& filter_exprs, int num_build_tuples, bool stores_nulls, - const vector<bool>& finds_nulls, int32_t initial_seed, MemTracker* mem_tracker, - const vector<RuntimeFilter*>& runtime_filters, bool stores_tuples, - int64_t num_buckets) - : state_(state), - build_exprs_(build_exprs), - probe_exprs_(probe_exprs), - filter_exprs_(filter_exprs), - filters_(runtime_filters), - num_build_tuples_(num_build_tuples), - stores_nulls_(stores_nulls), - finds_nulls_(finds_nulls), - finds_some_nulls_(std::accumulate( - finds_nulls_.begin(), finds_nulls_.end(), false, std::logical_or<bool>())), - stores_tuples_(stores_tuples), - initial_seed_(initial_seed), - num_filled_buckets_(0), - num_nodes_(0), - mem_pool_(new MemPool(mem_tracker)), - num_data_pages_(0), - next_node_(NULL), - node_remaining_current_page_(0), - mem_tracker_(mem_tracker), - mem_limit_exceeded_(false) { - DCHECK(mem_tracker != NULL); - DCHECK_EQ(build_exprs_.size(), probe_exprs_.size()); - DCHECK_EQ(build_exprs_.size(), finds_nulls_.size()); - DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2"; - buckets_.resize(num_buckets); - num_buckets_ = num_buckets; - num_buckets_till_resize_ = MAX_BUCKET_OCCUPANCY_FRACTION * num_buckets_; - mem_tracker_->Consume(buckets_.capacity() * sizeof(Bucket)); - - // Compute the layout and buffer size to store the evaluated expr results - results_buffer_size_ = ScalarExpr::ComputeResultsLayout(build_exprs_, - &expr_values_buffer_offsets_, &var_result_begin_); - expr_values_buffer_= new uint8_t[results_buffer_size_]; - memset(expr_values_buffer_, 0, sizeof(uint8_t) * results_buffer_size_); - expr_value_null_bits_ = new uint8_t[build_exprs_.size()]; - - GrowNodeArray(); -} - -Status OldHashTable::Init(ObjectPool* pool, RuntimeState* state) { - RETURN_IF_ERROR(ScalarExprEvaluator::Create(build_exprs_, state, pool, - mem_pool_.get(), &build_expr_evals_)); - DCHECK_EQ(build_exprs_.size(), build_expr_evals_.size()); - RETURN_IF_ERROR(ScalarExprEvaluator::Create(probe_exprs_, state, pool, - mem_pool_.get(), &probe_expr_evals_)); - DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size()); - RETURN_IF_ERROR(ScalarExprEvaluator::Create(filter_exprs_, state, pool, - mem_pool_.get(), &filter_expr_evals_)); - DCHECK_EQ(filter_exprs_.size(), filter_expr_evals_.size()); - return Status::OK(); -} - -Status OldHashTable::Create(ObjectPool* pool, RuntimeState* state, - const vector<ScalarExpr*>& build_exprs, const vector<ScalarExpr*>& probe_exprs, - const vector<ScalarExpr*>& filter_exprs, int num_build_tuples, bool stores_nulls, - const vector<bool>& finds_nulls, int32_t initial_seed, MemTracker* mem_tracker, - const vector<RuntimeFilter*>& runtime_filters, scoped_ptr<OldHashTable>* hash_tbl, - bool stores_tuples, int64_t num_buckets) { - hash_tbl->reset(new OldHashTable(state, build_exprs, probe_exprs, filter_exprs, - num_build_tuples, stores_nulls, finds_nulls, initial_seed, mem_tracker, - runtime_filters, stores_tuples, num_buckets)); - return (*hash_tbl)->Init(pool, state); -} - -Status OldHashTable::Open(RuntimeState* state) { - RETURN_IF_ERROR(ScalarExprEvaluator::Open(build_expr_evals_, state)); - DCHECK_EQ(build_exprs_.size(), build_expr_evals_.size()); - RETURN_IF_ERROR(ScalarExprEvaluator::Open(probe_expr_evals_, state)); - DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size()); - RETURN_IF_ERROR(ScalarExprEvaluator::Open(filter_expr_evals_, state)); - DCHECK_EQ(filter_exprs_.size(), filter_expr_evals_.size()); - return Status::OK(); -} - -void OldHashTable::Close(RuntimeState* state) { - // TODO: use tr1::array? - delete[] expr_values_buffer_; - delete[] expr_value_null_bits_; - expr_values_buffer_ = NULL; - expr_value_null_bits_ = NULL; - ScalarExprEvaluator::Close(build_expr_evals_, state); - ScalarExprEvaluator::Close(probe_expr_evals_, state); - ScalarExprEvaluator::Close(filter_expr_evals_, state); - mem_pool_->FreeAll(); - if (ImpaladMetrics::HASH_TABLE_TOTAL_BYTES != NULL) { - ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(-num_data_pages_ * HT_PAGE_SIZE); - } - mem_tracker_->Release(buckets_.capacity() * sizeof(Bucket)); - buckets_.clear(); -} - -void OldHashTable::FreeLocalAllocations() { - ScalarExprEvaluator::FreeLocalAllocations(build_expr_evals_); - ScalarExprEvaluator::FreeLocalAllocations(probe_expr_evals_); - ScalarExprEvaluator::FreeLocalAllocations(filter_expr_evals_); -} - -bool OldHashTable::EvalRow( - TupleRow* row, const vector<ScalarExprEvaluator*>& evals) { - bool has_null = false; - for (int i = 0; i < evals.size(); ++i) { - void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i]; - void* val = evals[i]->GetValue(row); - if (val == NULL) { - // If the table doesn't store nulls, no reason to keep evaluating - if (!stores_nulls_) return true; - - expr_value_null_bits_[i] = true; - val = &NULL_VALUE; - has_null = true; - } else { - expr_value_null_bits_[i] = false; - } - RawValue::Write(val, loc, build_exprs_[i]->type(), NULL); - } - return has_null; -} - -int OldHashTable::AddBloomFilters() { - int num_enabled_filters = 0; - vector<BloomFilter*> bloom_filters; - bloom_filters.resize(filters_.size()); - for (int i = 0; i < filters_.size(); ++i) { - if (state_->filter_bank()->FpRateTooHigh(filters_[i]->filter_size(), size())) { - bloom_filters[i] = BloomFilter::ALWAYS_TRUE_FILTER; - } else { - bloom_filters[i] = - state_->filter_bank()->AllocateScratchBloomFilter(filters_[i]->id()); - ++num_enabled_filters; - } - } - - OldHashTable::Iterator iter = Begin(); - while (iter != End()) { - TupleRow* row = iter.GetRow(); - for (int i = 0; i < filters_.size(); ++i) { - if (bloom_filters[i] == NULL) continue; - void* e = filter_expr_evals_[i]->GetValue(row); - uint32_t h = RawValue::GetHashValue(e, filter_exprs_[i]->type(), - RuntimeFilterBank::DefaultHashSeed()); - bloom_filters[i]->Insert(h); - } - iter.Next<false>(); - } - - // Update all the local filters in the filter bank. - for (int i = 0; i < filters_.size(); ++i) { - state_->filter_bank()->UpdateFilterFromLocal(filters_[i]->id(), bloom_filters[i]); - } - - return num_enabled_filters; -} - -// Helper function to store a value into the results buffer if the expr -// evaluated to NULL. We don't want (NULL, 1) to hash to the same as (0,1) so -// we'll pick a more random value. -static void CodegenAssignNullValue( - LlvmCodeGen* codegen, LlvmBuilder* builder, Value* dst, const ColumnType& type) { - uint64_t fnv_seed = HashUtil::FNV_SEED; - - if (type.type == TYPE_STRING || type.type == TYPE_VARCHAR) { - Value* dst_ptr = builder->CreateStructGEP(NULL, dst, 0, "string_ptr"); - Value* dst_len = builder->CreateStructGEP(NULL, dst, 1, "string_len"); - Value* null_len = codegen->GetIntConstant(TYPE_INT, fnv_seed); - Value* null_ptr = builder->CreateIntToPtr(null_len, codegen->ptr_type()); - builder->CreateStore(null_ptr, dst_ptr); - builder->CreateStore(null_len, dst_len); - return; - } else { - Value* null_value = NULL; - int byte_size = type.GetByteSize(); - // Get a type specific representation of fnv_seed - switch (type.type) { - case TYPE_BOOLEAN: - // In results, booleans are stored as 1 byte - dst = builder->CreateBitCast(dst, codegen->ptr_type()); - null_value = codegen->GetIntConstant(TYPE_TINYINT, fnv_seed); - break; - case TYPE_TIMESTAMP: { - // Cast 'dst' to 'i128*' - DCHECK_EQ(byte_size, 16); - PointerType* fnv_seed_ptr_type = - codegen->GetPtrType(Type::getIntNTy(codegen->context(), byte_size * 8)); - dst = builder->CreateBitCast(dst, fnv_seed_ptr_type); - null_value = codegen->GetIntConstant(byte_size, fnv_seed, fnv_seed); - break; - } - case TYPE_TINYINT: - case TYPE_SMALLINT: - case TYPE_INT: - case TYPE_BIGINT: - case TYPE_DECIMAL: - null_value = codegen->GetIntConstant(byte_size, fnv_seed, fnv_seed); - break; - case TYPE_FLOAT: { - // Don't care about the value, just the bit pattern - float fnv_seed_float = *reinterpret_cast<float*>(&fnv_seed); - null_value = ConstantFP::get(codegen->context(), APFloat(fnv_seed_float)); - break; - } - case TYPE_DOUBLE: { - // Don't care about the value, just the bit pattern - double fnv_seed_double = *reinterpret_cast<double*>(&fnv_seed); - null_value = ConstantFP::get(codegen->context(), APFloat(fnv_seed_double)); - break; - } - default: - DCHECK(false); - } - builder->CreateStore(null_value, dst); - } -} - -// Codegen for evaluating a tuple row over either build_exprs_ or probe_exprs_. -// For the case where we are joining on a single int, the IR looks like -// define i1 @EvaBuildRow(%"class.impala::OldHashTable"* %this_ptr, -// %"class.impala::TupleRow"* %row) { -// entry: -// %null_ptr = alloca i1 -// %0 = bitcast %"class.impala::TupleRow"* %row to i8** -// %eval = call i32 @SlotRef(i8** %0, i8* null, i1* %null_ptr) -// %1 = load i1* %null_ptr -// br i1 %1, label %null, label %not_null -// -// null: ; preds = %entry -// ret i1 true -// -// not_null: ; preds = %entry -// store i32 %eval, i32* inttoptr (i64 46146336 to i32*) -// br label %continue -// -// continue: ; preds = %not_null -// %2 = zext i1 %1 to i8 -// store i8 %2, i8* inttoptr (i64 46146248 to i8*) -// ret i1 false -// } -// For each expr, we create 3 code blocks. The null, not null and continue blocks. -// Both the null and not null branch into the continue block. The continue block -// becomes the start of the next block for codegen (either the next expr or just the -// end of the function). -Function* OldHashTable::CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build) { - DCHECK_EQ(build_exprs_.size(), probe_exprs_.size()); - const vector<ScalarExpr*>& exprs = build ? build_exprs_ : probe_exprs_; - for (int i = 0; i < exprs.size(); ++i) { - PrimitiveType type = exprs[i]->type().type; - if (type == TYPE_CHAR) return NULL; - } - - // Get types to generate function prototype - Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME); - DCHECK(tuple_row_type != NULL); - PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0); - - Type* this_type = codegen->GetType(OldHashTable::LLVM_CLASS_NAME); - DCHECK(this_type != NULL); - PointerType* this_ptr_type = PointerType::get(this_type, 0); - - LlvmCodeGen::FnPrototype prototype(codegen, build ? "EvalBuildRow" : "EvalProbeRow", - codegen->GetType(TYPE_BOOLEAN)); - prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type)); - prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); - - LLVMContext& context = codegen->context(); - LlvmBuilder builder(context); - Value* args[2]; - Function* fn = prototype.GeneratePrototype(&builder, args); - Value* this_ptr = args[0]; - Value* row = args[1]; - Value* has_null = codegen->false_value(); - - IRFunction::Type fn_name = build ? - IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS : - IRFunction::OLD_HASH_TABLE_GET_PROBE_EXPR_EVALUATORS; - Function* get_expr_eval_fn = codegen->GetFunction(fn_name, false); - DCHECK(get_expr_eval_fn != NULL); - - // Aggregation with no grouping exprs also use the hash table interface for - // code simplicity. In that case, there are no build exprs. - if (!exprs.empty()) { - // Load build_expr_evals_.data() / probe_expr_evals_.data() - Value* eval_vector = codegen->CodegenCallFunction(&builder, build ? - IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS : - IRFunction::OLD_HASH_TABLE_GET_PROBE_EXPR_EVALUATORS, - this_ptr, "eval_vector"); - - // Load expr_values_buffer_ - Value* expr_values_buffer = codegen->CodegenCallFunction(&builder, - IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUES_BUFFER, - this_ptr, "expr_values_buffer"); - - // Load expr_values_null_bits_ - Value* expr_value_null_bits = codegen->CodegenCallFunction(&builder, - IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS, - this_ptr, "expr_value_null_bits"); - - for (int i = 0; i < exprs.size(); ++i) { - BasicBlock* null_block = BasicBlock::Create(context, "null", fn); - BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", fn); - BasicBlock* continue_block = BasicBlock::Create(context, "continue", fn); - - // loc_addr = expr_values_buffer_ + expr_values_buffer_offsets_[i] - Value* llvm_loc = builder.CreateInBoundsGEP(NULL, expr_values_buffer, - codegen->GetIntConstant(TYPE_INT, expr_values_buffer_offsets_[i]), "loc_addr"); - llvm_loc = builder.CreatePointerCast(llvm_loc, - codegen->GetPtrType(exprs[i]->type()), "loc"); - - // Codegen GetValue() for exprs[i] - Function* expr_fn; - Status status = exprs[i]->GetCodegendComputeFn(codegen, &expr_fn); - if (!status.ok()) { - fn->eraseFromParent(); // deletes function - VLOG_QUERY << "Failed to codegen EvalTupleRow(): " << status.GetDetail(); - return NULL; - } - - // Load evals[i] and call GetValue() - Value* eval_arg = - codegen->CodegenArrayAt(&builder, eval_vector, i, "eval"); - DCHECK(eval_arg->getType()->isPointerTy()); - CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder, - exprs[i]->type(), expr_fn, {eval_arg, row}, "result"); - Value* is_null = result.GetIsNull(); - - // Set null-byte result - Value* null_bits = builder.CreateZExt(is_null, codegen->GetType(TYPE_TINYINT)); - Value* llvm_null_bits_loc = builder.CreateInBoundsGEP(NULL, expr_value_null_bits, - codegen->GetIntConstant(TYPE_INT, i), "null_bits_loc"); - builder.CreateStore(null_bits, llvm_null_bits_loc); - builder.CreateCondBr(is_null, null_block, not_null_block); - - // Null block - builder.SetInsertPoint(null_block); - if (!stores_nulls_) { - // hash table doesn't store nulls, no reason to keep evaluating exprs - builder.CreateRet(codegen->true_value()); - } else { - CodegenAssignNullValue(codegen, &builder, llvm_loc, exprs[i]->type()); - has_null = codegen->true_value(); - builder.CreateBr(continue_block); - } - - // Not null block - builder.SetInsertPoint(not_null_block); - result.ToNativePtr(llvm_loc); - builder.CreateBr(continue_block); - - builder.SetInsertPoint(continue_block); - } - } - builder.CreateRet(has_null); - return codegen->FinalizeFunction(fn); -} - -uint32_t OldHashTable::HashVariableLenRow() { - uint32_t hash = initial_seed_; - // Hash the non-var length portions (if there are any) - if (var_result_begin_ != 0) { - hash = HashUtil::Hash(expr_values_buffer_, var_result_begin_, hash); - } - - for (int i = 0; i < build_exprs_.size(); ++i) { - // non-string and null slots are already part of expr_values_buffer - if (build_exprs_[i]->type().type != TYPE_STRING && - build_exprs_[i]->type().type != TYPE_VARCHAR) { - continue; - } - - void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i]; - if (expr_value_null_bits_[i]) { - // Hash the null random seed values at 'loc' - hash = HashUtil::Hash(loc, sizeof(StringValue), hash); - } else { - // Hash the string - StringValue* str = reinterpret_cast<StringValue*>(loc); - hash = HashUtil::Hash(str->ptr, str->len, hash); - } - } - return hash; -} - -// Codegen for hashing the current row. In the case with both string and non-string data -// (group by int_col, string_col), the IR looks like: -// define i32 @HashCurrentRow(%"class.impala::OldHashTable"* %this_ptr) { -// entry: -// %0 = call i32 @IrCrcHash(i8* inttoptr (i64 51107808 to i8*), i32 16, i32 0) -// %1 = load i8* inttoptr (i64 29500112 to i8*) -// %2 = icmp ne i8 %1, 0 -// br i1 %2, label %null, label %not_null -// -// null: ; preds = %entry -// %3 = call i32 @IrCrcHash(i8* inttoptr (i64 51107824 to i8*), i32 16, i32 %0) -// br label %continue -// -// not_null: ; preds = %entry -// %4 = load i8** getelementptr inbounds ( -// %"struct.impala::StringValue"* inttoptr -// (i64 51107824 to %"struct.impala::StringValue"*), i32 0, i32 0) -// %5 = load i32* getelementptr inbounds ( -// %"struct.impala::StringValue"* inttoptr -// (i64 51107824 to %"struct.impala::StringValue"*), i32 0, i32 1) -// %6 = call i32 @IrCrcHash(i8* %4, i32 %5, i32 %0) -// br label %continue -// -// continue: ; preds = %not_null, %null -// %7 = phi i32 [ %6, %not_null ], [ %3, %null ] -// ret i32 %7 -// } -// TODO: can this be cross-compiled? -Function* OldHashTable::CodegenHashCurrentRow(LlvmCodeGen* codegen) { - for (int i = 0; i < build_exprs_.size(); ++i) { - // Disable codegen for CHAR - if (build_exprs_[i]->type().type == TYPE_CHAR) return NULL; - } - - // Get types to generate function prototype - Type* this_type = codegen->GetType(OldHashTable::LLVM_CLASS_NAME); - DCHECK(this_type != NULL); - PointerType* this_ptr_type = PointerType::get(this_type, 0); - - LlvmCodeGen::FnPrototype prototype(codegen, "HashCurrentRow", - codegen->GetType(TYPE_INT)); - prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type)); - - LLVMContext& context = codegen->context(); - LlvmBuilder builder(context); - Value* this_ptr; - Function* fn = prototype.GeneratePrototype(&builder, &this_ptr); - - // Load expr_values_buffer_ - Value* expr_values_buffer = codegen->CodegenCallFunction(&builder, - IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUES_BUFFER, this_ptr, "expr_values_buffer"); - - Value* hash_result = codegen->GetIntConstant(TYPE_INT, initial_seed_); - if (var_result_begin_ == -1) { - // No variable length slots, just hash what is in 'expr_values_buffer_' - if (results_buffer_size_ > 0) { - Function* hash_fn = codegen->GetHashFunction(results_buffer_size_); - Value* len = codegen->GetIntConstant(TYPE_INT, results_buffer_size_); - hash_result = builder.CreateCall(hash_fn, {expr_values_buffer, len, hash_result}); - } - } else { - if (var_result_begin_ > 0) { - Function* hash_fn = codegen->GetHashFunction(var_result_begin_); - Value* len = codegen->GetIntConstant(TYPE_INT, var_result_begin_); - hash_result = builder.CreateCall(hash_fn, {expr_values_buffer, len, hash_result}); - } - - // Load expr_value_null_bits_ - Value* expr_value_null_bits = codegen->CodegenCallFunction(&builder, - IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS, - this_ptr, "expr_value_null_bits"); - - // Hash string slots - for (int i = 0; i < build_exprs_.size(); ++i) { - if (build_exprs_[i]->type().type != TYPE_STRING - && build_exprs_[i]->type().type != TYPE_VARCHAR) continue; - - BasicBlock* null_block = NULL; - BasicBlock* not_null_block = NULL; - BasicBlock* continue_block = NULL; - Value* str_null_result = NULL; - - Value* llvm_buffer_loc = builder.CreateInBoundsGEP(NULL, expr_values_buffer, - codegen->GetIntConstant(TYPE_INT, expr_values_buffer_offsets_[i]), "buffer_loc"); - - // If the hash table stores nulls, we need to check if the stringval - // evaluated to NULL - if (stores_nulls_) { - null_block = BasicBlock::Create(context, "null", fn); - not_null_block = BasicBlock::Create(context, "not_null", fn); - continue_block = BasicBlock::Create(context, "continue", fn); - - // Load expr_values_null_bits_[i] and check if it's set. - Value* llvm_null_bits_loc = builder.CreateInBoundsGEP(NULL, expr_value_null_bits, - codegen->GetIntConstant(TYPE_INT, i), "null_bits_loc"); - Value* null_bits = builder.CreateLoad(llvm_null_bits_loc); - Value* is_null = builder.CreateICmpNE(null_bits, - codegen->GetIntConstant(TYPE_TINYINT, 0)); - builder.CreateCondBr(is_null, null_block, not_null_block); - - // For null, we just want to call the hash function on a portion of the data. - builder.SetInsertPoint(null_block); - Function* null_hash_fn = codegen->GetHashFunction(sizeof(StringValue)); - Value* len = codegen->GetIntConstant(TYPE_INT, sizeof(StringValue)); - str_null_result = builder.CreateCall(null_hash_fn, - ArrayRef<Value*>({llvm_buffer_loc, len, hash_result})); - builder.CreateBr(continue_block); - - builder.SetInsertPoint(not_null_block); - } - - // Convert expr_values_buffer_ loc to llvm value - Value* str_val = builder.CreatePointerCast(llvm_buffer_loc, - codegen->GetPtrType(TYPE_STRING), "str_val"); - - Value* ptr = builder.CreateStructGEP(NULL, str_val, 0, "ptr"); - Value* len = builder.CreateStructGEP(NULL, str_val, 1, "len"); - ptr = builder.CreateLoad(ptr); - len = builder.CreateLoad(len); - - // Call hash(ptr, len, hash_result); - Function* general_hash_fn = codegen->GetHashFunction(); - Value* string_hash_result = - builder.CreateCall(general_hash_fn, ArrayRef<Value*>({ptr, len, hash_result})); - - if (stores_nulls_) { - builder.CreateBr(continue_block); - builder.SetInsertPoint(continue_block); - // Use phi node to reconcile that we could have come from the string-null - // path and string not null paths. - PHINode* phi_node = builder.CreatePHI(codegen->GetType(TYPE_INT), 2); - phi_node->addIncoming(string_hash_result, not_null_block); - phi_node->addIncoming(str_null_result, null_block); - hash_result = phi_node; - } else { - hash_result = string_hash_result; - } - } - } - - builder.CreateRet(hash_result); - return codegen->FinalizeFunction(fn); -} - -bool OldHashTable::Equals(TupleRow* build_row) { - for (int i = 0; i < build_exprs_.size(); ++i) { - void* val = build_expr_evals_[i]->GetValue(build_row); - if (val == NULL) { - if (!(stores_nulls_ && finds_nulls_[i])) return false; - if (!expr_value_null_bits_[i]) return false; - continue; - } else { - if (expr_value_null_bits_[i]) return false; - } - - void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i]; - if (!RawValue::Eq(loc, val, build_exprs_[i]->type())) { - return false; - } - } - return true; -} - -// Codegen for OldHashTable::Equals. For a hash table with two exprs (string,int), the -// IR looks like: -// -// define i1 @Equals(%"class.impala::OldHashTable"* %this_ptr, -// %"class.impala::TupleRow"* %row) { -// entry: -// %result = call i64 @GetSlotRef(%"class.impala::ScalarExpr"* inttoptr -// (i64 146381856 to %"class.impala::ScalarExpr"*), -// %"class.impala::TupleRow"* %row) -// %0 = trunc i64 %result to i1 -// br i1 %0, label %null, label %not_null -// -// false_block: ; preds = %not_null2, %null1, %not_null, %null -// ret i1 false -// -// null: ; preds = %entry -// br i1 false, label %continue, label %false_block -// -// not_null: ; preds = %entry -// %1 = load i32* inttoptr (i64 104774368 to i32*) -// %2 = ashr i64 %result, 32 -// %3 = trunc i64 %2 to i32 -// %cmp_raw = icmp eq i32 %3, %1 -// br i1 %cmp_raw, label %continue, label %false_block -// -// continue: ; preds = %not_null, %null -// %result4 = call { i64, i8* } @GetSlotRef1( -// %"class.impala::ScalarExpr"* inttoptr -// (i64 146381696 to %"class.impala::ScalarExpr"*), -// %"class.impala::TupleRow"* %row) -// %4 = extractvalue { i64, i8* } %result4, 0 -// %5 = trunc i64 %4 to i1 -// br i1 %5, label %null1, label %not_null2 -// -// null1: ; preds = %continue -// br i1 false, label %continue3, label %false_block -// -// not_null2: ; preds = %continue -// %6 = extractvalue { i64, i8* } %result4, 0 -// %7 = ashr i64 %6, 32 -// %8 = trunc i64 %7 to i32 -// %result5 = extractvalue { i64, i8* } %result4, 1 -// %cmp_raw6 = call i1 @_Z11StringValEQPciPKN6impala11StringValueE( -// i8* %result5, i32 %8, %"struct.impala::StringValue"* inttoptr -// (i64 104774384 to %"struct.impala::StringValue"*)) -// br i1 %cmp_raw6, label %continue3, label %false_block -// -// continue3: ; preds = %not_null2, %null1 -// ret i1 true -// } -Function* OldHashTable::CodegenEquals(LlvmCodeGen* codegen) { - for (int i = 0; i < build_exprs_.size(); ++i) { - // Disable codegen for CHAR - if (build_exprs_[i]->type().type == TYPE_CHAR) return NULL; - } - - // Get types to generate function prototype - Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME); - DCHECK(tuple_row_type != NULL); - PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0); - - Type* this_type = codegen->GetType(OldHashTable::LLVM_CLASS_NAME); - DCHECK(this_type != NULL); - PointerType* this_ptr_type = PointerType::get(this_type, 0); - - LlvmCodeGen::FnPrototype prototype(codegen, "Equals", codegen->GetType(TYPE_BOOLEAN)); - prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type)); - prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type)); - - LLVMContext& context = codegen->context(); - LlvmBuilder builder(context); - Value* args[2]; - Function* fn = prototype.GeneratePrototype(&builder, args); - Value* this_ptr = args[0]; - Value* row = args[1]; - - if (!build_exprs_.empty()) { - BasicBlock* false_block = BasicBlock::Create(context, "false_block", fn); - - // Load build_expr_evals_.data() - Value* eval_vector = codegen->CodegenCallFunction(&builder, - IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS, - this_ptr, "eval_vector"); - - // Load expr_values_buffer_ - Value* expr_values_buffer = codegen->CodegenCallFunction(&builder, - IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUES_BUFFER, - this_ptr, "expr_values_buffer"); - - // Load expr_value_null_bits_ - Value* expr_value_null_bits = codegen->CodegenCallFunction(&builder, - IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS, - this_ptr, "expr_value_null_bits"); - - for (int i = 0; i < build_exprs_.size(); ++i) { - BasicBlock* null_block = BasicBlock::Create(context, "null", fn); - BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", fn); - BasicBlock* continue_block = BasicBlock::Create(context, "continue", fn); - - // Generate GetValue() of build_expr_evals_[i] - Function* expr_fn; - Status status = build_exprs_[i]->GetCodegendComputeFn(codegen, &expr_fn); - if (!status.ok()) { - fn->eraseFromParent(); // deletes function - VLOG_QUERY << "Failed to codegen Equals(): " << status.GetDetail(); - return NULL; - } - - // Call GetValue() on build_expr_evals_[i] - Value* eval_arg = - codegen->CodegenArrayAt(&builder, eval_vector, i, "eval"); - CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder, - build_exprs_[i]->type(), expr_fn, {eval_arg, row}, "result"); - Value* is_null = result.GetIsNull(); - - // Determine if probe is null (i.e. expr_value_null_bits_[i] == true). In - // the case where the hash table does not store nulls, this is always false. - Value* probe_is_null = codegen->false_value(); - if (stores_nulls_ && finds_nulls_[i]) { - Value* llvm_null_bits_loc = builder.CreateInBoundsGEP(NULL, expr_value_null_bits, - codegen->GetIntConstant(TYPE_INT, i), "null_bits_loc"); - Value* null_bits = builder.CreateLoad(llvm_null_bits_loc, "null_bits"); - probe_is_null = builder.CreateICmpNE(null_bits, - codegen->GetIntConstant(TYPE_TINYINT, 0)); - } - - // Get llvm value for probe_val from 'expr_values_buffer_' - Value* probe_val = builder.CreateInBoundsGEP(NULL, expr_values_buffer, - codegen->GetIntConstant(TYPE_INT, expr_values_buffer_offsets_[i]), "probe_val"); - probe_val = builder.CreatePointerCast( - probe_val, codegen->GetPtrType(build_exprs_[i]->type())); - - // Branch for GetValue() returning NULL - builder.CreateCondBr(is_null, null_block, not_null_block); - - // Null block - builder.SetInsertPoint(null_block); - builder.CreateCondBr(probe_is_null, continue_block, false_block); - - // Not-null block - builder.SetInsertPoint(not_null_block); - if (stores_nulls_) { - BasicBlock* cmp_block = BasicBlock::Create(context, "cmp", fn); - // First need to compare that probe_expr[i] is not null - builder.CreateCondBr(probe_is_null, false_block, cmp_block); - builder.SetInsertPoint(cmp_block); - } - // Check result == probe_val - Value* is_equal = result.EqToNativePtr(probe_val); - builder.CreateCondBr(is_equal, continue_block, false_block); - - builder.SetInsertPoint(continue_block); - } - builder.CreateRet(codegen->true_value()); - - builder.SetInsertPoint(false_block); - builder.CreateRet(codegen->false_value()); - } else { - builder.CreateRet(codegen->true_value()); - } - return codegen->FinalizeFunction(fn); -} - -void OldHashTable::ResizeBuckets(int64_t num_buckets) { - DCHECK_EQ((num_buckets & (num_buckets-1)), 0) - << "num_buckets=" << num_buckets << " must be a power of 2"; - - int64_t old_num_buckets = num_buckets_; - // This can be a rather large allocation so check the limit before (to prevent - // us from going over the limits too much). - int64_t delta_size = (num_buckets - old_num_buckets) * sizeof(Bucket); - if (!mem_tracker_->TryConsume(delta_size)) { - MemLimitExceeded(delta_size); - return; - } - buckets_.resize(num_buckets); - - // If we're doubling the number of buckets, all nodes in a particular bucket - // either remain there, or move down to an analogous bucket in the other half. - // In order to efficiently check which of the two buckets a node belongs in, the number - // of buckets must be a power of 2. - bool doubled_buckets = (num_buckets == old_num_buckets * 2); - for (int i = 0; i < num_buckets_; ++i) { - Bucket* bucket = &buckets_[i]; - Bucket* sister_bucket = &buckets_[i + old_num_buckets]; - Node* last_node = NULL; - Node* node = bucket->node; - - while (node != NULL) { - Node* next = node->next; - uint32_t hash = node->hash; - - bool node_must_move; - Bucket* move_to; - if (doubled_buckets) { - node_must_move = ((hash & old_num_buckets) != 0); - move_to = sister_bucket; - } else { - int64_t bucket_idx = hash & (num_buckets - 1); - node_must_move = (bucket_idx != i); - move_to = &buckets_[bucket_idx]; - } - - if (node_must_move) { - MoveNode(bucket, move_to, node, last_node); - } else { - last_node = node; - } - - node = next; - } - } - - num_buckets_ = num_buckets; - num_buckets_till_resize_ = MAX_BUCKET_OCCUPANCY_FRACTION * num_buckets_; -} - -void OldHashTable::GrowNodeArray() { - node_remaining_current_page_ = HT_PAGE_SIZE / sizeof(Node); - next_node_ = reinterpret_cast<Node*>(mem_pool_->Allocate(HT_PAGE_SIZE)); - ++num_data_pages_; - if (ImpaladMetrics::HASH_TABLE_TOTAL_BYTES != NULL) { - ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(HT_PAGE_SIZE); - } - if (mem_tracker_->LimitExceeded()) MemLimitExceeded(HT_PAGE_SIZE); -} - -void OldHashTable::MemLimitExceeded(int64_t allocation_size) { - mem_limit_exceeded_ = true; - if (state_ != NULL) state_->SetMemLimitExceeded(mem_tracker_, allocation_size); -} - -string OldHashTable::DebugString(bool skip_empty, bool show_match, - const RowDescriptor* desc) { - stringstream ss; - ss << endl; - for (int i = 0; i < buckets_.size(); ++i) { - Node* node = buckets_[i].node; - bool first = true; - if (skip_empty && node == NULL) continue; - ss << i << ": "; - while (node != NULL) { - if (!first) ss << ","; - ss << node << "(" << node->data << ")"; - if (desc != NULL) ss << " " << PrintRow(GetRow(node), *desc); - if (show_match) { - if (node->matched) { - ss << " [M]"; - } else { - ss << " [U]"; - } - } - node = node->next; - first = false; - } - ss << endl; - } - return ss.str(); -}
