http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-aggregation-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc index 45b5a0e..d575c01 100644 --- a/be/src/exec/partitioned-aggregation-node-ir.cc +++ b/be/src/exec/partitioned-aggregation-node-ir.cc @@ -53,13 +53,13 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch, template<bool AGGREGATED_ROWS> Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row, HashTableCtx* __restrict__ ht_ctx) { - uint32_t hash = 0; if (AGGREGATED_ROWS) { - if (!ht_ctx->EvalAndHashBuild(row, &hash)) return Status::OK(); + if (!ht_ctx->EvalAndHashBuild(row)) return Status::OK(); } else { - if (!ht_ctx->EvalAndHashProbe(row, &hash)) return Status::OK(); + if (!ht_ctx->EvalAndHashProbe(row)) return Status::OK(); } + uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash(); // To process this row, we first see if it can be aggregated or inserted into this // partition's hash table. If we need to insert it and that fails, due to OOM, we // spill the partition. The partition to spill is not necessarily dst_partition, @@ -76,7 +76,7 @@ Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row, bool found; // Find the appropriate bucket in the hash table. There will always be a free // bucket because we checked the size above. - HashTable::Iterator it = ht->FindBuildRowBucket(ht_ctx, hash, &found); + HashTable::Iterator it = ht->FindBuildRowBucket(ht_ctx, &found); DCHECK(!it.AtEnd()) << "Hash table had no free buckets"; if (AGGREGATED_ROWS) { // If the row is already an aggregate row, it cannot match anything in the @@ -149,9 +149,9 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize, RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows()); FOREACH_ROW(in_batch, 0, in_batch_iter) { - uint32_t hash; TupleRow* in_row = in_batch_iter.Get(); - if (!ht_ctx->EvalAndHashProbe(in_row, &hash)) continue; + if (!ht_ctx->EvalAndHashProbe(in_row)) continue; + uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash(); const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); if (TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx], in_row, hash, @@ -192,7 +192,7 @@ bool PartitionedAggregationNode::TryAddToHashTable( DCHECK_GE(*remaining_capacity, 0); bool found; // This is called from ProcessBatchStreaming() so the rows are not aggregated. - HashTable::Iterator it = partition->hash_tbl->FindBuildRowBucket(ht_ctx, hash, &found); + HashTable::Iterator it = partition->hash_tbl->FindBuildRowBucket(ht_ctx, &found); Tuple* intermediate_tuple; if (found) { intermediate_tuple = it.GetTuple();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index b7dca61..2cde059 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -252,9 +252,9 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) { RETURN_IF_ERROR(state_->GetQueryStatus()); singleton_output_tuple_returned_ = false; } else { - ht_ctx_.reset(new HashTableCtx(build_expr_ctxs_, grouping_expr_ctxs_, true, - std::vector<bool>(build_expr_ctxs_.size(), true), state->fragment_hash_seed(), - MAX_PARTITION_DEPTH, 1)); + RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, grouping_expr_ctxs_, + true, vector<bool>(build_expr_ctxs_.size(), true), state->fragment_hash_seed(), + MAX_PARTITION_DEPTH, 1, mem_tracker(), &ht_ctx_)); RETURN_IF_ERROR(state_->block_mgr()->RegisterClient( Substitute("PartitionedAggregationNode id=$0 ptr=$1", id_, this), MinRequiredBuffers(), true, mem_tracker(), state, &block_mgr_client_)); @@ -984,9 +984,9 @@ int PartitionedAggregationNode::GroupingExprsVarlenSize() { int varlen_size = 0; // TODO: The hash table could compute this as it hashes. for (int expr_idx: string_grouping_exprs_) { - StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->last_expr_value(expr_idx)); + StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->ExprValue(expr_idx)); // Avoid branching by multiplying length by null bit. - varlen_size += sv->len * !ht_ctx_->last_expr_value_null(expr_idx); + varlen_size += sv->len * !ht_ctx_->ExprValueNull(expr_idx); } return varlen_size; } @@ -997,17 +997,17 @@ void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple, // Copy over all grouping slots (the variable length data is copied below). for (int i = 0; i < grouping_expr_ctxs_.size(); ++i) { SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i]; - if (ht_ctx_->last_expr_value_null(i)) { + if (ht_ctx_->ExprValueNull(i)) { intermediate_tuple->SetNull(slot_desc->null_indicator_offset()); } else { - void* src = ht_ctx_->last_expr_value(i); + void* src = ht_ctx_->ExprValue(i); void* dst = intermediate_tuple->GetSlot(slot_desc->tuple_offset()); memcpy(dst, src, slot_desc->slot_size()); } } for (int expr_idx: string_grouping_exprs_) { - if (ht_ctx_->last_expr_value_null(expr_idx)) continue; + if (ht_ctx_->ExprValueNull(expr_idx)) continue; SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[expr_idx]; // ptr and len were already copied to the fixed-len part of string value http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc index ef4c010..d66eda0 100644 --- a/be/src/exec/partitioned-hash-join-node-ir.cc +++ b/be/src/exec/partitioned-hash-join-node-ir.cc @@ -219,67 +219,145 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins( } template<int const JoinOp> +bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRow( + ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, + ExprContext* const* conjunct_ctxs, int num_conjuncts, + RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) { + if (JoinOp == TJoinOp::INNER_JOIN) { + return ProcessProbeRowInnerJoin(other_join_conjunct_ctxs, num_other_join_conjuncts, + conjunct_ctxs, num_conjuncts, out_batch_iterator, remaining_capacity); + } else if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN || + JoinOp == TJoinOp::RIGHT_ANTI_JOIN) { + return ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_ctxs, + num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator, + remaining_capacity); + } else if (JoinOp == TJoinOp::LEFT_SEMI_JOIN || + JoinOp == TJoinOp::LEFT_ANTI_JOIN || + JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { + return ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_ctxs, + num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator, + remaining_capacity, status); + } else { + DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN || + JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN); + return ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_ctxs, + num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator, + remaining_capacity); + } +} + +template<int const JoinOp> bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow( HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator, - int* remaining_capacity, int num_other_join_conjuncts, Status* status) { - while (!probe_batch_iterator->AtEnd()) { + int* remaining_capacity, Status* status) { + HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); + while (!expr_vals_cache->AtEnd()) { // Establish current_probe_row_ and find its corresponding partition. + DCHECK(!probe_batch_iterator->AtEnd()); current_probe_row_ = probe_batch_iterator->Get(); - probe_batch_iterator->Next(); matched_probe_ = false; - uint32_t hash; - if (!ht_ctx->EvalAndHashProbe(current_probe_row_, &hash)) { + // True if the current row should be skipped for probing. + bool skip_row = false; + + // The hash of the expressions results for the current probe row. + uint32_t hash = expr_vals_cache->ExprValuesHash(); + // Hoist the followings out of the else statement below to speed up non-null case. + const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); + HashTable* hash_tbl = hash_tbls_[partition_idx]; + + // Fetch the hash and expr values' nullness for this row. + if (expr_vals_cache->IsRowNull()) { if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && non_empty_build_) { + const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size(); // For NAAJ, we need to treat NULLs on the probe carefully. The logic is: - // 1. No build rows -> Return this row. + // 1. No build rows -> Return this row. The check for 'non_empty_build_' + // is for this case. // 2. Has build rows & no other join predicates, skip row. // 3. Has build rows & other join predicates, we need to evaluate against all // build rows. First evaluate it against this partition, and if there is not // a match, save it to evaluate against other partitions later. If there // is a match, the row is skipped. - if (num_other_join_conjuncts > 0) { - if (UNLIKELY(!AppendRow(null_probe_rows_, current_probe_row_, status))) { + if (num_other_join_conjuncts == 0) { + // Condition 2 above. + skip_row = true; + } else if (LIKELY(AppendRow(null_probe_rows_, current_probe_row_, status))) { + // Condition 3 above. + matched_null_probe_.push_back(false); + skip_row = true; + } else { + // Condition 3 above but failed to append to 'null_probe_rows_'. Bail out. + DCHECK(!status->ok()); + return false; + } + } + } else { + // The build partition is in memory. Return this row for probing. + if (LIKELY(hash_tbl != NULL)) { + hash_tbl_iterator_ = hash_tbl->FindProbeRow(ht_ctx); + } else { + // The build partition is either empty or spilled. + Partition* partition = hash_partitions_[partition_idx]; + // This partition is closed, meaning the build side for this partition was empty. + if (UNLIKELY(partition->is_closed())) { + DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING); + } else { + // This partition is not in memory, spill the probe row and move to the next row. + DCHECK(partition->is_spilled()); + DCHECK(partition->probe_rows() != NULL); + // Skip the current row if we manage to append to the spilled partition's BTS. + // Otherwise, we need to bail out and report the failure. + if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) { DCHECK(!status->ok()); return false; } - matched_null_probe_.push_back(false); + skip_row = true; } - continue; } - return true; - } - const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); - // The build partition is in memory. Return this row for probing. - if (LIKELY(hash_tbls_[partition_idx] != NULL)) { - hash_tbl_iterator_ = hash_tbls_[partition_idx]->FindProbeRow(ht_ctx, hash); - return true; - } - // The build partition is either empty or spilled. - Partition* partition = hash_partitions_[partition_idx]; - // This partition is closed, meaning the build side for this partition was empty. - if (UNLIKELY(partition->is_closed())) { - DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING); - return true; - } - // This partition is not in memory, spill the probe row and move to the next row. - DCHECK(partition->is_spilled()); - DCHECK(partition->probe_rows() != NULL); - if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) { - DCHECK(!status->ok()); - return false; } + // Move to the next probe row and hash table context's cached value. + probe_batch_iterator->Next(); + expr_vals_cache->NextRow(); + if (skip_row) continue; + DCHECK(status->ok()); + return true; + } + if (probe_batch_iterator->AtEnd()) { + // No more probe row. + current_probe_row_ = NULL; } - // Finished this batch. - current_probe_row_ = NULL; return false; } -// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by -// codegen. +void IR_ALWAYS_INLINE PartitionedHashJoinNode::EvalAndHashProbePrefetchGroup( + TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx) { + RowBatch* probe_batch = probe_batch_.get(); + HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); + const int prefetch_size = expr_vals_cache->capacity(); + DCHECK(expr_vals_cache->AtEnd()); + + expr_vals_cache->Reset(); + FOREACH_ROW_LIMIT(probe_batch, probe_batch_pos_, prefetch_size, batch_iter) { + TupleRow* row = batch_iter.Get(); + if (ht_ctx->EvalAndHashProbe(row)) { + if (prefetch_mode != TPrefetchMode::NONE) { + uint32_t hash = expr_vals_cache->ExprValuesHash(); + const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); + HashTable* hash_tbl = hash_tbls_[partition_idx]; + if (LIKELY(hash_tbl != NULL)) hash_tbl->PrefetchBucket<true>(hash); + } + } else { + expr_vals_cache->SetRowNull(); + } + expr_vals_cache->NextRow(); + } + expr_vals_cache->ResetForRead(); +} + +// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by codegen. template<int const JoinOp> -int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch, - HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) { +int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode, + RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) { ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0]; const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size(); ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; @@ -292,48 +370,51 @@ int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch, // Note that 'probe_batch_pos_' is the row no. of the row after 'current_probe_row_'. RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), probe_batch_pos_); int remaining_capacity = max_rows; + bool has_probe_rows = current_probe_row_ != NULL || !probe_batch_iterator.AtEnd(); + HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); - do { - if (current_probe_row_ != NULL) { - if (JoinOp == TJoinOp::INNER_JOIN) { - if (!ProcessProbeRowInnerJoin(other_join_conjunct_ctxs, num_other_join_conjuncts, - conjunct_ctxs, num_conjuncts, &out_batch_iterator, &remaining_capacity)) { - break; - } - } else if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN || - JoinOp == TJoinOp::RIGHT_ANTI_JOIN) { - if (!ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_ctxs, - num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator, - &remaining_capacity)) { - break; - } - } else if (JoinOp == TJoinOp::LEFT_SEMI_JOIN || - JoinOp == TJoinOp::LEFT_ANTI_JOIN || - JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - if (!ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_ctxs, - num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator, - &remaining_capacity, status)) { - break; - } - } else { - DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN || - JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN); - if (!ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_ctxs, - num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator, - &remaining_capacity)) { + // Keep processing more probe rows if there are more to process and the output batch + // has room and we haven't hit any error yet. + while (has_probe_rows && remaining_capacity > 0 && status->ok()) { + // Prefetch for the current hash_tbl_iterator_. + if (prefetch_mode != TPrefetchMode::NONE) { + hash_tbl_iterator_.PrefetchBucket<true>(); + } + // Evaluate and hash more rows if prefetch group is empty. A prefetch group is a cache + // of probe expressions results, nullness of the expression values and hash values + // against some consecutive number of rows in the probe batch. Prefetching, if + // enabled, is interleaved with the rows' evaluation and hashing. If the prefetch + // group is partially full (e.g. we returned before the current prefetch group was + // exhausted in the previous iteration), we will proceed with the remaining items in + // the values cache. + if (expr_vals_cache->AtEnd()) { + EvalAndHashProbePrefetchGroup(prefetch_mode, ht_ctx); + } + // Process the prefetch group. + do { + // 'current_probe_row_' can be NULL on the first iteration through this loop. + if (current_probe_row_ != NULL) { + if (!ProcessProbeRow<JoinOp>(other_join_conjunct_ctxs, num_other_join_conjuncts, + conjunct_ctxs, num_conjuncts, &out_batch_iterator, &remaining_capacity, + status)) { + if (status->ok()) DCHECK_EQ(remaining_capacity, 0); break; } } - } - // Must have reached the end of the hash table iterator for the current row before - // moving to the next row. - DCHECK(hash_tbl_iterator_.AtEnd()); - DCHECK(status->ok()); - } while (NextProbeRow<JoinOp>(ht_ctx, &probe_batch_iterator, &remaining_capacity, - num_other_join_conjuncts, status)); - // Update where we are in the probe batch. - probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0)) / - probe_batch_->num_tuples_per_row(); + // Must have reached the end of the hash table iterator for the current row before + // moving to the next row. + DCHECK(hash_tbl_iterator_.AtEnd()); + DCHECK(status->ok()); + } while (NextProbeRow<JoinOp>(ht_ctx, &probe_batch_iterator, &remaining_capacity, + status)); + // Update whether there are more probe rows to process in the current batch. + has_probe_rows = current_probe_row_ != NULL; + if (!has_probe_rows) DCHECK(probe_batch_iterator.AtEnd()); + // Update where we are in the probe batch. + probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0)) / + probe_batch_->num_tuples_per_row(); + } + int num_rows_added; if (LIKELY(status->ok())) { num_rows_added = max_rows - remaining_capacity; @@ -346,42 +427,14 @@ int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch, return num_rows_added; } -int PartitionedHashJoinNode::ProcessProbeBatch( - const TJoinOp::type join_op, RowBatch* out_batch, - HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) { - switch (join_op) { - case TJoinOp::INNER_JOIN: - return ProcessProbeBatch<TJoinOp::INNER_JOIN>(out_batch, ht_ctx, status); - case TJoinOp::LEFT_OUTER_JOIN: - return ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(out_batch, ht_ctx, status); - case TJoinOp::LEFT_SEMI_JOIN: - return ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(out_batch, ht_ctx, status); - case TJoinOp::LEFT_ANTI_JOIN: - return ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(out_batch, ht_ctx, status); - case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN: - return ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(out_batch, ht_ctx, - status); - case TJoinOp::RIGHT_OUTER_JOIN: - return ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(out_batch, ht_ctx, status); - case TJoinOp::RIGHT_SEMI_JOIN: - return ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(out_batch, ht_ctx, status); - case TJoinOp::RIGHT_ANTI_JOIN: - return ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(out_batch, ht_ctx, status); - case TJoinOp::FULL_OUTER_JOIN: - return ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(out_batch, ht_ctx, status); - default: - DCHECK(false) << "Unknown join type"; - return -1; - } -} - Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch, bool build_filters) { + HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx_->expr_values_cache(); + expr_vals_cache->Reset(); FOREACH_ROW(build_batch, 0, build_batch_iter) { DCHECK(build_status_.ok()); - uint32_t hash; TupleRow* build_row = build_batch_iter.Get(); - if (!ht_ctx_->EvalAndHashBuild(build_row, &hash)) { + if (!ht_ctx_->EvalAndHashBuild(build_row)) { if (null_aware_partition_ != NULL) { // TODO: remove with codegen/template // If we are NULL aware and this build row has NULL in the eq join slot, @@ -405,6 +458,7 @@ Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch, ctx.local_bloom_filter->Insert(filter_hash); } } + const uint32_t hash = expr_vals_cache->ExprValuesHash(); const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); Partition* partition = hash_partitions_[partition_idx]; const bool result = AppendRow(partition->build_rows(), build_row, &build_status_); @@ -413,37 +467,69 @@ Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch, return Status::OK(); } -bool PartitionedHashJoinNode::Partition::InsertBatch(HashTableCtx* ht_ctx, - RowBatch* batch, const vector<BufferedTupleStream::RowIdx>& indices) { - DCHECK_LE(batch->num_rows(), hash_values_.size()); - DCHECK_LE(batch->num_rows(), null_bitmap_.num_bits()); +bool PartitionedHashJoinNode::Partition::InsertBatch( + TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx, RowBatch* batch, + const vector<BufferedTupleStream::RowIdx>& indices) { // Compute the hash values and prefetch the hash table buckets. - int i = 0; - uint32_t* hash_values = hash_values_.data(); - null_bitmap_.SetAllBits(false); - FOREACH_ROW(batch, 0, batch_iter) { - if (ht_ctx->EvalAndHashBuild(batch_iter.Get(), &hash_values[i])) { - // TODO: Find the optimal prefetch batch size. This may be something - // processor dependent so we may need calibration at Impala startup time. - hash_tbl_->PrefetchBucket<false>(hash_values[i]); - } else { - null_bitmap_.Set<false>(i, true); + const int num_rows = batch->num_rows(); + HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); + const int prefetch_size = expr_vals_cache->capacity(); + const BufferedTupleStream::RowIdx* row_indices = indices.data(); + for (int prefetch_group_row = 0; prefetch_group_row < num_rows; + prefetch_group_row += prefetch_size) { + int cur_row = prefetch_group_row; + expr_vals_cache->Reset(); + FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) { + if (ht_ctx->EvalAndHashBuild(batch_iter.Get())) { + if (prefetch_mode != TPrefetchMode::NONE) { + hash_tbl_->PrefetchBucket<false>(expr_vals_cache->ExprValuesHash()); + } + } else { + expr_vals_cache->SetRowNull(); + } + expr_vals_cache->NextRow(); } - ++i; - } - // Do the insertion. - i = 0; - const BufferedTupleStream::RowIdx* row_idx = indices.data(); - FOREACH_ROW(batch, 0, batch_iter) { - if (LIKELY(!null_bitmap_.Get<false>(i))) { + // Do the insertion. + expr_vals_cache->ResetForRead(); + FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) { TupleRow* row = batch_iter.Get(); - if (UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx[i], row, hash_values[i]))) { + BufferedTupleStream::RowIdx row_idx = row_indices[cur_row]; + if (!expr_vals_cache->IsRowNull() && + UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx, row))) { return false; } + expr_vals_cache->NextRow(); + ++cur_row; } - ++i; } return true; } +template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::INNER_JOIN>( + TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx, + Status* status); +template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>( + TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx, + Status* status); +template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>( + TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx, + Status* status); +template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>( + TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx, + Status* status); +template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>( + TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx, + Status* status); +template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>( + TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx, + Status* status); +template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>( + TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx, + Status* status); +template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>( + TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx, + Status* status); +template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>( + TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx, + Status* status); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index 47c118a..1888e06 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -143,7 +143,9 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) { // Although ConstructBuildSide() maybe be run in a separate thread, it is safe to free // local allocations in QueryMaintenance() since the build thread is not run // concurrently with other expr evaluation in this join node. - AddExprCtxsToFree(probe_expr_ctxs_); + // Probe side expr is not included in QueryMaintenance(). We cache the probe expression + // values in ExprValuesCache. Local allocations need to survive until the cache is reset + // so we need to manually free probe expr local allocations. AddExprCtxsToFree(build_expr_ctxs_); // other_join_conjunct_ctxs_ are evaluated in the context of rows assembled from all @@ -162,10 +164,10 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) { join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN || std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(), false, std::logical_or<bool>()); - ht_ctx_.reset(new HashTableCtx(build_expr_ctxs_, probe_expr_ctxs_, should_store_nulls, - is_not_distinct_from_, state->fragment_hash_seed(), MAX_PARTITION_DEPTH, - child(1)->row_desc().tuple_descriptors().size())); - + RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, probe_expr_ctxs_, + should_store_nulls, is_not_distinct_from_, state->fragment_hash_seed(), + MAX_PARTITION_DEPTH, child(1)->row_desc().tuple_descriptors().size(), mem_tracker(), + &ht_ctx_)); if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), "NullAwareAntiJoinEvalTime"); } @@ -206,20 +208,20 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) { ht_ctx_->CodegenHashCurrentRow(state, true, &murmur_hash_fn)); // Codegen for evaluating build rows - Function* eval_row_fn; - codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(state, true, &eval_row_fn)); + Function* eval_build_row_fn; + codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(state, true, &eval_build_row_fn)); if (codegen_status.ok()) { // Codegen for build path build_codegen_status = - CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn, eval_row_fn); + CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn, eval_build_row_fn); if (build_codegen_status.ok()) build_codegen_enabled = true; // Codegen for probe path probe_codegen_status = CodegenProcessProbeBatch(state, hash_fn, murmur_hash_fn); if (probe_codegen_status.ok()) probe_codegen_enabled = true; // Codegen for InsertBatch() insert_codegen_status = CodegenInsertBatch(state, hash_fn, murmur_hash_fn, - eval_row_fn); + eval_build_row_fn); if (insert_codegen_status.ok()) ht_construction_codegen_enabled = true; } else { build_codegen_status = codegen_status; @@ -324,8 +326,7 @@ PartitionedHashJoinNode::Partition::Partition(RuntimeState* state, : parent_(parent), is_closed_(false), is_spilled_(false), - level_(level), - null_bitmap_(state->batch_size()) { + level_(level) { build_rows_ = new BufferedTupleStream(state, parent_->child(1)->row_desc(), state->block_mgr(), parent_->block_mgr_client_, true /* use_initial_small_buffers */, false /* read_write */); @@ -334,8 +335,6 @@ PartitionedHashJoinNode::Partition::Partition(RuntimeState* state, state->block_mgr(), parent_->block_mgr_client_, true /* use_initial_small_buffers */, false /* read_write */ ); DCHECK(probe_rows_ != NULL); - hash_values_.resize(state->batch_size()); - null_bitmap_.SetAllBits(false); } PartitionedHashJoinNode::Partition::~Partition() { @@ -465,6 +464,7 @@ Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state, DCHECK_EQ(batch.num_rows(), indices.size()); DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets()) << build_rows()->RowConsumesMemory(); + TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; SCOPED_TIMER(parent_->build_timer_); if (parent_->insert_batch_fn_ != NULL) { InsertBatchFn insert_batch_fn; @@ -474,9 +474,13 @@ Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state, insert_batch_fn = parent_->insert_batch_fn_; } DCHECK(insert_batch_fn != NULL); - if (UNLIKELY(!insert_batch_fn(this, ctx, &batch, indices))) goto not_built; + if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) { + goto not_built; + } } else { - if (UNLIKELY(!InsertBatch(ctx, &batch, indices))) goto not_built; + if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) { + goto not_built; + } } RETURN_IF_ERROR(state->GetQueryStatus()); parent_->FreeLocalAllocations(); @@ -652,7 +656,6 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level DCHECK(new_partition != NULL); hash_partitions_.push_back(partition_pool_->Add(new_partition)); RETURN_IF_ERROR(new_partition->build_rows()->Init(id(), runtime_profile(), true)); - // Initialize a buffer for the probe here to make sure why have it if we need it. // While this is not strictly necessary (there are some cases where we won't need this // buffer), the benefit is low. Not grabbing this buffer means there is an additional @@ -671,6 +674,8 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level while (!eos) { RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); + // 'probe_expr_ctxs_' should have made no local allocations in this function. + DCHECK(!ExprContext::HasLocalAllocations(probe_expr_ctxs_)); if (input_partition_ == NULL) { // If we are still consuming batches from the build side. { @@ -887,6 +892,43 @@ int64_t PartitionedHashJoinNode::LargestSpilledPartition() const { return max_rows; } +int PartitionedHashJoinNode::ProcessProbeBatch( + const TJoinOp::type join_op, TPrefetchMode::type prefetch_mode, + RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status) { + switch (join_op) { + case TJoinOp::INNER_JOIN: + return ProcessProbeBatch<TJoinOp::INNER_JOIN>(prefetch_mode, out_batch, + ht_ctx, status); + case TJoinOp::LEFT_OUTER_JOIN: + return ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(prefetch_mode, out_batch, + ht_ctx, status); + case TJoinOp::LEFT_SEMI_JOIN: + return ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(prefetch_mode, out_batch, + ht_ctx, status); + case TJoinOp::LEFT_ANTI_JOIN: + return ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(prefetch_mode, out_batch, + ht_ctx, status); + case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN: + return ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(prefetch_mode, + out_batch, ht_ctx, status); + case TJoinOp::RIGHT_OUTER_JOIN: + return ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(prefetch_mode, out_batch, + ht_ctx, status); + case TJoinOp::RIGHT_SEMI_JOIN: + return ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(prefetch_mode, out_batch, + ht_ctx, status); + case TJoinOp::RIGHT_ANTI_JOIN: + return ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(prefetch_mode, out_batch, + ht_ctx, status); + case TJoinOp::FULL_OUTER_JOIN: + return ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(prefetch_mode, out_batch, + ht_ctx, status); + default: + DCHECK(false) << "Unknown join type"; + return -1; + } +} + Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos) { SCOPED_TIMER(runtime_profile_->total_time_counter()); @@ -951,16 +993,19 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch // Putting SCOPED_TIMER in ProcessProbeBatch() causes weird exception handling IR // in the xcompiled function, so call it here instead. int rows_added = 0; + TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; SCOPED_TIMER(probe_timer_); if (process_probe_batch_fn_ == NULL) { - rows_added = ProcessProbeBatch(join_op_, out_batch, ht_ctx_.get(), &status); + rows_added = ProcessProbeBatch(join_op_, prefetch_mode, out_batch, ht_ctx_.get(), + &status); } else { DCHECK(process_probe_batch_fn_level0_ != NULL); if (ht_ctx_->level() == 0) { - rows_added = process_probe_batch_fn_level0_(this, out_batch, ht_ctx_.get(), - &status); + rows_added = process_probe_batch_fn_level0_(this, prefetch_mode, out_batch, + ht_ctx_.get(), &status); } else { - rows_added = process_probe_batch_fn_(this, out_batch, ht_ctx_.get(), &status); + rows_added = process_probe_batch_fn_(this, prefetch_mode, out_batch, + ht_ctx_.get(), &status); } } if (UNLIKELY(rows_added < 0)) { @@ -982,6 +1027,10 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch } else { RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch)); } + // Free local allocations of the probe side expressions only after ExprValuesCache + // has been reset. + DCHECK(ht_ctx_->expr_values_cache()->AtEnd()); + ExprContext::FreeLocalAllocations(probe_expr_ctxs_); // We want to return as soon as we have attached a tuple stream to the out_batch // (before preparing a new partition). The attached tuple stream will be recycled @@ -1615,6 +1664,7 @@ Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state, codegen->CloneFunction(process_build_batch_fn); // Always build runtime filters at level0 (if there are any). + // Note that the first argument of this function is the return value. Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 3); build_filters_l0_arg->replaceAllUsesWith( ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0)); @@ -1630,7 +1680,8 @@ Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state, DCHECK_EQ(replaced, 1); // Never build filters after repartitioning, as all rows have already been added to the - // filters during the level0 build. + // filters during the level0 build. Note that the first argument of this function is the + // return value. Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 3); build_filters_arg->replaceAllUsesWith( ConstantInt::get(Type::getInt1Ty(codegen->context()), false)); @@ -1698,21 +1749,26 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch( DCHECK(process_probe_batch_fn != NULL); process_probe_batch_fn->setName("ProcessProbeBatch"); - // Since ProcessProbeBatch() is a templated function, it has linkonce_odr linkage, which - // means the function can be removed if it's not referenced. Change to weak_odr, which - // has the same semantics except it can't be removed. - // See http://llvm.org/docs/LangRef.html#linkage-types - DCHECK(process_probe_batch_fn->getLinkage() == GlobalValue::LinkOnceODRLinkage) + // Verifies that ProcessProbeBatch() has weak_odr linkage so it's not discarded even + // if it's not referenced. See http://llvm.org/docs/LangRef.html#linkage-types + DCHECK(process_probe_batch_fn->getLinkage() == GlobalValue::WeakODRLinkage) << LlvmCodeGen::Print(process_probe_batch_fn); - process_probe_batch_fn->setLinkage(GlobalValue::WeakODRLinkage); // Bake in %this pointer argument to process_probe_batch_fn. Value* this_arg = codegen->GetArgument(process_probe_batch_fn, 0); Value* this_loc = codegen->CastPtrToLlvmPtr(this_arg->getType(), this); this_arg->replaceAllUsesWith(this_loc); + // Replace the parameter 'prefetch_mode' with constant. + Value* prefetch_mode_arg = codegen->GetArgument(process_probe_batch_fn, 1); + TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; + DCHECK_GE(prefetch_mode, TPrefetchMode::NONE); + DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET); + prefetch_mode_arg->replaceAllUsesWith( + ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode)); + // Bake in %ht_ctx pointer argument to process_probe_batch_fn - Value* ht_ctx_arg = codegen->GetArgument(process_probe_batch_fn, 2); + Value* ht_ctx_arg = codegen->GetArgument(process_probe_batch_fn, 3); Value* ht_ctx_loc = codegen->CastPtrToLlvmPtr(ht_ctx_arg->getType(), ht_ctx_.get()); ht_ctx_arg->replaceAllUsesWith(ht_ctx_loc); @@ -1823,9 +1879,17 @@ Status PartitionedHashJoinNode::CodegenInsertBatch(RuntimeState* state, Function* build_equals_fn; RETURN_IF_ERROR(ht_ctx_->CodegenEquals(state, true, &build_equals_fn)); + // Replace the parameter 'prefetch_mode' with constant. + Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1); + TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode; + DCHECK_GE(prefetch_mode, TPrefetchMode::NONE); + DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET); + prefetch_mode_arg->replaceAllUsesWith( + ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode)); + // Use codegen'd EvalBuildRow() function int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow"); - DCHECK_EQ(replaced, 2); + DCHECK_EQ(replaced, 1); // Use codegen'd Equals() function replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals"); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index 37c2c4f..a76dced 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -189,7 +189,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// Probes and updates the hash table for the current probe row for either /// RIGHT_SEMI_JOIN or RIGHT_ANTI_JOIN. For RIGHT_SEMI_JOIN, all matching build - /// rows will be appended to the 'out_batch'; For RIGHT_ANTI_JOIN, update the + /// rows will be appended to the output batch; For RIGHT_ANTI_JOIN, update the /// hash table only if matches are found. The actual output happens in /// OutputUnmatchedBuild(). Returns true if probing is done for the current /// probe row and should continue to next row. @@ -207,8 +207,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// Probes the hash table for the current probe row for LEFT_SEMI_JOIN, /// LEFT_ANTI_JOIN or NULL_AWARE_LEFT_ANTI_JOIN. The probe row will be appended - /// to 'out_batch' if it's part of the output. Returns true if probing - /// is done for the current probe row and should continue to next row. + /// to output batch if there is a match (for LEFT_SEMI_JOIN) or if there is no + /// match (for LEFT_ANTI_JOIN). Returns true if probing is done for the current + /// probe row and should continue to next row. /// /// 'out_batch_iterator' is the iterator for the output batch. /// 'remaining_capacity' tracks the number of additional rows that can be added to @@ -223,7 +224,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// Probes the hash table for the current probe row for LEFT_OUTER_JOIN, /// RIGHT_OUTER_JOIN or FULL_OUTER_JOIN. The matching build and/or probe row - /// will appended to 'out_batch'. For RIGHT/FULL_OUTER_JOIN, some of the outputs + /// will be appended to output batch. For RIGHT/FULL_OUTER_JOIN, some of the outputs /// are added in OutputUnmatchedBuild(). Returns true if probing is done for the /// current probe row and should continue to next row. /// @@ -239,6 +240,25 @@ class PartitionedHashJoinNode : public BlockingJoinNode { ExprContext* const* conjunct_ctxs, int num_conjuncts, RowBatch::Iterator* out_batch_iterator, int* remaining_capacity); + /// Probes 'current_probe_row_' against the the hash tables and append outputs + /// to output batch. Wrapper around the join-type specific probe row functions + /// declared above. + template<int const JoinOp> + bool inline ProcessProbeRow( + ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, + ExprContext* const* conjunct_ctxs, int num_conjuncts, + RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status); + + /// Evaluates some number of rows in 'probe_batch_' against the probe expressions + /// and hashes the results to 32-bit hash values. The evaluation results and the hash + /// values are stored in the expression values cache in 'ht_ctx'. The number of rows + /// processed depends on the capacity available in 'ht_ctx->expr_values_cache_'. + /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE, + /// hash table buckets will be prefetched based on the hash values computed. Note + /// that 'prefetch_mode' will be substituted with constants during codegen time. + void EvalAndHashProbePrefetchGroup(TPrefetchMode::type prefetch_mode, + HashTableCtx* ctx); + /// Find the next probe row. Returns true if a probe row is found. In which case, /// 'current_probe_row_' and 'hash_tbl_iterator_' have been set up to point to the /// next probe row and its corresponding partition. 'status' may be updated if @@ -246,7 +266,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode { template<int const JoinOp> bool inline NextProbeRow( HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator, - int* remaining_capacity, int num_other_join_conjuncts, Status* status); + int* remaining_capacity, Status* status); /// Process probe rows from probe_batch_. Returns either if out_batch is full or /// probe_batch_ is entirely consumed. @@ -256,11 +276,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// set). This function doesn't commit rows to the output batch so it's the caller's /// responsibility to do so. template<int const JoinOp> - int ProcessProbeBatch(RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status); + int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch, HashTableCtx* ht_ctx, + Status* status); /// Wrapper that calls the templated version of ProcessProbeBatch() based on 'join_op'. - int ProcessProbeBatch(const TJoinOp::type join_op, RowBatch* out_batch, - HashTableCtx* ht_ctx, Status* status); + int ProcessProbeBatch(const TJoinOp::type join_op, TPrefetchMode::type, + RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status); /// Sweep the hash_tbl_ of the partition that is at the front of /// output_build_partitions_, using hash_tbl_iterator_ and output any unmatched build @@ -553,9 +574,13 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array /// containing the index of each row's index into the hash table's tuple stream. - /// This function is replaced with a codegen'd version. - bool InsertBatch(HashTableCtx* ctx, RowBatch* batch, - const std::vector<BufferedTupleStream::RowIdx>& indices); + /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash + /// table buckets which the rows hashes to will be prefetched. This parameter is + /// replaced with a constant during codegen time. This function may be replaced with + /// a codegen'd version. Returns true if all rows in 'batch' are successfully + /// inserted. + bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx, + RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices); PartitionedHashJoinNode* parent_; @@ -581,13 +606,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// If NULL, ownership has been transfered. BufferedTupleStream* build_rows_; BufferedTupleStream* probe_rows_; - - /// Store hash values of each row for the current batch computed during prefetching. - std::vector<uint32_t> hash_values_; - - /// Bitmap to indicate rows evaluated to NULL for the current batch when building - /// hash tables. - Bitmap null_bitmap_; }; /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available @@ -600,14 +618,14 @@ class PartitionedHashJoinNode : public BlockingJoinNode { ProcessBuildBatchFn process_build_batch_fn_; ProcessBuildBatchFn process_build_batch_fn_level0_; - typedef int (*ProcessProbeBatchFn)( - PartitionedHashJoinNode*, RowBatch*, HashTableCtx*, Status*); + typedef int (*ProcessProbeBatchFn)(PartitionedHashJoinNode*, + TPrefetchMode::type, RowBatch*, HashTableCtx*, Status*); /// Jitted ProcessProbeBatch function pointers. NULL if codegen is disabled. ProcessProbeBatchFn process_probe_batch_fn_; ProcessProbeBatchFn process_probe_batch_fn_level0_; - typedef bool (*InsertBatchFn)(Partition*, HashTableCtx*, RowBatch*, - const std::vector<BufferedTupleStream::RowIdx>&); + typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*, + RowBatch*, const std::vector<BufferedTupleStream::RowIdx>&); /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled. InsertBatchFn insert_batch_fn_; InsertBatchFn insert_batch_fn_level0_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.inline.h b/be/src/exec/partitioned-hash-join-node.inline.h index d63df1e..8ebeab3 100644 --- a/be/src/exec/partitioned-hash-join-node.inline.h +++ b/be/src/exec/partitioned-hash-join-node.inline.h @@ -26,6 +26,7 @@ inline void PartitionedHashJoinNode::ResetForProbe() { probe_batch_pos_ = 0; matched_probe_ = true; hash_tbl_iterator_.SetAtEnd(); + ht_ctx_->expr_values_cache()->Reset(); } inline bool PartitionedHashJoinNode::AppendRow(BufferedTupleStream* stream, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exprs/expr-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-context.cc b/be/src/exprs/expr-context.cc index 7231c5e..2a01bfa 100644 --- a/be/src/exprs/expr-context.cc +++ b/be/src/exprs/expr-context.cc @@ -110,8 +110,23 @@ Status ExprContext::Clone(RuntimeState* state, ExprContext** new_ctx) { return root_->Open(state, *new_ctx, FunctionContext::THREAD_LOCAL); } -void ExprContext::FreeLocalAllocations() { - FreeLocalAllocations(fn_contexts_); +bool ExprContext::HasLocalAllocations(const vector<ExprContext*>& ctxs) { + for (int i = 0; i < ctxs.size(); ++i) { + if (ctxs[i]->HasLocalAllocations()) return true; + } + return false; +} + +bool ExprContext::HasLocalAllocations() { + return HasLocalAllocations(fn_contexts_); +} + +bool ExprContext::HasLocalAllocations(const std::vector<FunctionContext*>& fn_ctxs) { + for (int i = 0; i < fn_ctxs.size(); ++i) { + if (fn_ctxs[i]->impl()->closed()) continue; + if (fn_ctxs[i]->impl()->HasLocalAllocations()) return true; + } + return false; } void ExprContext::FreeLocalAllocations(const vector<ExprContext*>& ctxs) { @@ -120,6 +135,10 @@ void ExprContext::FreeLocalAllocations(const vector<ExprContext*>& ctxs) { } } +void ExprContext::FreeLocalAllocations() { + FreeLocalAllocations(fn_contexts_); +} + void ExprContext::FreeLocalAllocations(const vector<FunctionContext*>& fn_ctxs) { for (int i = 0; i < fn_ctxs.size(); ++i) { if (fn_ctxs[i]->impl()->closed()) continue; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exprs/expr-context.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-context.h b/be/src/exprs/expr-context.h index 0816774..9078ce6 100644 --- a/be/src/exprs/expr-context.h +++ b/be/src/exprs/expr-context.h @@ -123,10 +123,16 @@ class ExprContext { TimestampVal GetTimestampVal(TupleRow* row); DecimalVal GetDecimalVal(TupleRow* row); + /// Returns true if any of the expression contexts in the array has local allocations. + /// The last two are helper functions. + static bool HasLocalAllocations(const std::vector<ExprContext*>& ctxs); + bool HasLocalAllocations(); + static bool HasLocalAllocations(const std::vector<FunctionContext*>& fn_ctxs); + /// Frees all local allocations made by fn_contexts_. This can be called when result - /// data from this context is no longer needed. - void FreeLocalAllocations(); + /// data from this context is no longer needed. The last two are helper functions. static void FreeLocalAllocations(const std::vector<ExprContext*>& ctxs); + void FreeLocalAllocations(); static void FreeLocalAllocations(const std::vector<FunctionContext*>& ctxs); static const char* LLVM_CLASS_NAME; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index aac0043..3f2ba29 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -143,16 +143,21 @@ class RowBatch { } /// An iterator for going through a row batch, starting at 'row_idx'. - /// This is more efficient than using GetRow() as it avoids loading the - /// row batch state and doing multiplication on each loop with GetRow(). + /// If 'limit' is specified, it will iterate up to row number 'row_idx + limit' + /// or the last row, whichever comes first. Otherwise, it will iterate till the last + /// row in the batch. This is more efficient than using GetRow() as it avoids loading + /// the row batch state and doing multiplication on each loop with GetRow(). class Iterator { public: - IR_ALWAYS_INLINE Iterator(RowBatch* parent, int row_idx) : + Iterator(RowBatch* parent, int row_idx, int limit = -1) : num_tuples_per_row_(parent->num_tuples_per_row_), - row_(parent->tuple_ptrs_ + row_idx * num_tuples_per_row_), - row_batch_end_(parent->tuple_ptrs_ + parent->num_rows_ * num_tuples_per_row_), + row_(parent->tuple_ptrs_ + num_tuples_per_row_ * row_idx), + row_batch_end_(parent->tuple_ptrs_ + num_tuples_per_row_ * + (limit == -1 ? parent->num_rows_ : + std::min<int>(row_idx + limit, parent->num_rows_))), parent_(parent) { DCHECK_GE(row_idx, 0); + DCHECK_GT(num_tuples_per_row_, 0); /// We allow empty row batches with num_rows_ == capacity_ == 0. /// That's why we cannot call GetRow() above to initialize 'row_'. DCHECK_LE(row_idx, parent->capacity_); @@ -397,12 +402,17 @@ class RowBatch { } -/// Macro for iterating through '_row_batch', starting at '_start_row_idx'. +/// Macros for iterating through '_row_batch', starting at '_start_row_idx'. /// '_row_batch' is the row batch to iterate through. /// '_start_row_idx' is the starting row index. /// '_iter' is the iterator. +/// '_limit' is the max number of rows to iterate over. #define FOREACH_ROW(_row_batch, _start_row_idx, _iter) \ for (RowBatch::Iterator _iter(_row_batch, _start_row_idx); \ !_iter.AtEnd(); _iter.Next()) +#define FOREACH_ROW_LIMIT(_row_batch, _start_row_idx, _limit, _iter) \ + for (RowBatch::Iterator _iter(_row_batch, _start_row_idx, _limit); \ + !_iter.AtEnd(); _iter.Next()) + #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/runtime/test-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc index 99b227c..39fc7cd 100644 --- a/be/src/runtime/test-env.cc +++ b/be/src/runtime/test-env.cc @@ -78,6 +78,7 @@ Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_si block_mgr_parent_tracker_.get(), (*runtime_state)->runtime_profile(), tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr)); (*runtime_state)->set_block_mgr(mgr); + (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1); query_states_.push_back(shared_ptr<RuntimeState>(*runtime_state)); return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/service/query-options.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 274776c..d12ce1f 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -390,6 +390,17 @@ Status impala::SetQueryOption(const string& key, const string& value, iequals(value, "true") || iequals(value, "1")); break; } + case TImpalaQueryOptions::PREFETCH_MODE: { + if (iequals(value, "NONE") || iequals(value, "0")) { + query_options->__set_prefetch_mode(TPrefetchMode::NONE); + } else if (iequals(value, "HT_BUCKET") || iequals(value, "1")) { + query_options->__set_prefetch_mode(TPrefetchMode::HT_BUCKET); + } else { + return Status(Substitute("Invalid prefetch mode '$0'. Valid modes are " + "NONE(0) or HT_BUCKET(1)", value)); + } + break; + } default: // We hit this DCHECK(false) if we forgot to add the corresponding entry here // when we add a new query option. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/service/query-options.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index fb24530..60c122d 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -32,7 +32,7 @@ class TQueryOptions; // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE + 1);\ + TImpalaQueryOptions::PREFETCH_MODE + 1);\ QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\ QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -78,7 +78,8 @@ class TQueryOptions; QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\ QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\ QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\ - QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE); + QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE)\ + QUERY_OPT_FN(prefetch_mode, PREFETCH_MODE); /// Converts a TQueryOptions struct into a map of key, value pairs. void TQueryOptionsToMap(const TQueryOptions& query_options, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/udf/udf-internal.h ---------------------------------------------------------------------- diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h index 3e5993b..10ce43b 100644 --- a/be/src/udf/udf-internal.h +++ b/be/src/udf/udf-internal.h @@ -84,6 +84,9 @@ class FunctionContextImpl { /// Frees all allocations returned by AllocateLocal(). void FreeLocalAllocations(); + /// Returns true if there are any allocations returned by AllocateLocal(). + bool HasLocalAllocations() const { return !local_allocations_.empty(); } + /// Sets constant_args_. The AnyVal* values are owned by the caller. void SetConstantArgs(const std::vector<impala_udf::AnyVal*>& constant_args); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 74bef28..ada1a20 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -193,6 +193,9 @@ struct TQueryOptions { // Maximum runtime filter size, in bytes 47: optional i32 runtime_filter_max_size = 16777216 + + // Prefetching behavior during hash tables' building and probing. + 48: optional Types.TPrefetchMode prefetch_mode = Types.TPrefetchMode.HT_BUCKET } // Impala currently has two types of sessions: Beeswax and HiveServer2 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/ImpalaService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 9421ec4..29ae4cc 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -219,7 +219,10 @@ enum TImpalaQueryOptions { RUNTIME_FILTER_MAX_SIZE, // Minimum runtime filter size, in bytes. - RUNTIME_FILTER_MIN_SIZE + RUNTIME_FILTER_MIN_SIZE, + + // Prefetching behavior during hash tables' building and probing. + PREFETCH_MODE } // The summary of an insert. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/Types.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift index 85a14dc..24f4524 100644 --- a/common/thrift/Types.thrift +++ b/common/thrift/Types.thrift @@ -120,6 +120,14 @@ enum TRuntimeFilterMode { GLOBAL } +enum TPrefetchMode { + // No prefetching at all. + NONE, + + // Prefetch the hash table buckets. + HT_BUCKET +} + // A TNetworkAddress is the standard host, port representation of a // network address. The hostname field must be resolvable to an IPv4 // address.
