http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-builder.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h new file mode 100644 index 0000000..23822b2 --- /dev/null +++ b/be/src/exec/partitioned-hash-join-builder.h @@ -0,0 +1,460 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H +#define IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H + +#include <boost/scoped_ptr.hpp> +#include <memory> + +#include "common/object-pool.h" +#include "common/status.h" +#include "exec/data-sink.h" +#include "exec/filter-context.h" +#include "exec/hash-table.h" +#include "runtime/buffered-block-mgr.h" +#include "runtime/buffered-tuple-stream.h" + +#include "gen-cpp/PlanNodes_types.h" + +namespace impala { + +class ExprContext; +class RowDescriptor; +class RuntimeState; + +/// The build side for the PartitionedHashJoinNode. Build-side rows are hash-partitioned +/// into PARTITION_FANOUT partitions, with partitions spilled if the full build side +/// does not fit in memory. Spilled partitions can be repartitioned with a different +/// hash function per level of repartitioning. +/// +/// The builder owns the hash tables and build row streams. The builder first does the +/// level 0 partitioning of build rows. After FlushFinal() the builder has produced some +/// in-memory partitions and some spilled partitions. The in-memory partitions have hash +/// tables and the spilled partitions have a probe-side stream prepared with one write +/// buffer, which is sufficient to spill the partition's probe rows to disk without +/// allocating additional buffers. +/// +/// After this initial partitioning, the join node probes the in-memory hash partitions. +/// The join node then drives processing of any spilled partitions, calling +/// Partition::BuildHashTable() to build hash tables for a spilled partition or calling +/// RepartitionBuildInput() to repartition a level n partition into multiple level n + 1 +/// partitions. +/// +/// Both the PartitionedHashJoinNode and the builder share a BufferedBlockMgr client +/// and the corresponding reservations. Different stages of the spilling algorithm +/// require different mixes of build and probe buffers and hash tables, so we can +/// share the reservation to minimize the combined memory requirement. Initial probe-side +/// buffers are allocated in the builder then handed off to the probe side to implement +/// this reservation sharing. +/// +/// TODO: after we have reliable reservations (IMPALA-3200), we can simplify the handoff +/// to the probe side by using reservations instead of preparing the streams. +/// +/// The full hash join algorithm is documented in PartitionedHashJoinNode. +class PhjBuilder : public DataSink { + public: + class Partition; + + PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor& probe_row_desc, + const RowDescriptor& build_row_desc, RuntimeState* state); + + Status Init(RuntimeState* state, const std::vector<TEqJoinCondition>& eq_join_conjuncts, + const std::vector<TRuntimeFilterDesc>& filters); + + /// Implementations of DataSink interface methods. + virtual std::string GetName() override; + virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker) override; + virtual Status Open(RuntimeState* state) override; + virtual Status Send(RuntimeState* state, RowBatch* batch) override; + virtual Status FlushFinal(RuntimeState* state) override; + virtual void Close(RuntimeState* state) override; + + ///////////////////////////////////////// + // The following functions are used only by PartitionedHashJoinNode. + ///////////////////////////////////////// + + /// Reset the builder to the same state as it was in after calling Open(). + void Reset(); + + /// Transfer ownership of the probe streams to the caller. One stream was allocated per + /// spilled partition in FlushFinal(). The probe streams are empty but prepared for + /// writing with a write buffer allocated. + std::vector<std::unique_ptr<BufferedTupleStream>> TransferProbeStreams(); + + /// Clears the current list of hash partitions. Called after probing of the partitions + /// is done. The partitions are not closed or destroyed, since they may be spilled or + /// may contain unmatched build rows for certain join modes (e.g. right outer join). + void ClearHashPartitions() { hash_partitions_.clear(); } + + /// Close the null aware partition (if there is one) and set it to NULL. + void CloseNullAwarePartition(RowBatch* out_batch) { + if (null_aware_partition_ != NULL) { + null_aware_partition_->Close(out_batch); + null_aware_partition_ = NULL; + } + } + + /// Creates new hash partitions and repartitions 'input_partition'. The previous + /// hash partitions must have been cleared with ClearHashPartitions(). + /// 'level' is the level new partitions should be created with. This functions prepares + /// 'input_probe_rows' for reading in "delete_on_read" mode, so that the probe phase + /// has enough buffers preallocated to execute successfully. + Status RepartitionBuildInput( + Partition* input_partition, int level, BufferedTupleStream* input_probe_rows); + + /// Returns the largest build row count out of the current hash partitions. + int64_t LargestPartitionRows() const; + + /// True if the hash table may contain rows with one or more NULL join keys. This + /// depends on the join type and the equijoin conjuncts. + bool HashTableStoresNulls() const; + + /// Accessor functions, mainly required to expose state to PartitionedHashJoinNode. + inline BufferedBlockMgr::Client* block_mgr_client() const { return block_mgr_client_; } + inline bool non_empty_build() const { return non_empty_build_; } + inline const std::vector<bool>& is_not_distinct_from() const { + return is_not_distinct_from_; + } + inline int num_hash_partitions() const { return hash_partitions_.size(); } + inline Partition* hash_partition(int partition_idx) const { + DCHECK_GE(partition_idx, 0); + DCHECK_LT(partition_idx, hash_partitions_.size()); + return hash_partitions_[partition_idx]; + } + inline Partition* null_aware_partition() const { return null_aware_partition_; } + + std::string DebugString() const; + + /// Number of initial partitions to create. Must be a power of two. + static const int PARTITION_FANOUT = 16; + + /// Needs to be log2(PARTITION_FANOUT). + static const int NUM_PARTITIONING_BITS = 4; + + /// Maximum number of times we will repartition. The maximum build table we + /// can process is: + /// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). With a (low) 1GB + /// limit and 64 fanout, we can support 256TB build tables in the case where + /// there is no skew. + /// In the case where there is skew, repartitioning is unlikely to help (assuming a + /// reasonable hash function). + /// Note that we need to have at least as many SEED_PRIMES in HashTableCtx. + /// TODO: we can revisit and try harder to explicitly detect skew. + static const int MAX_PARTITION_DEPTH = 16; + + /// A partition containing a subset of build rows. + /// + /// A partition may contain two data structures: the build rows and optionally a hash + /// table built over the build rows. Building the hash table requires all build rows to + /// be pinned in memory. The build rows are kept in memory if all partitions fit in + /// memory, but can be unpinned and spilled to disk to free up memory. Reading or + /// writing the unpinned rows requires a single read or write buffer. If the unpinned + /// rows are not being read or written, they can be completely unpinned, requiring no + /// buffers. + /// + /// The build input is first partitioned by hash function level 0 into the level 0 + /// partitions. Then, if the join spills and the size of a level n partition is too + /// large to fit in memory, the partition's rows can be repartitioned with the level + /// n + 1 hash function into the level n + 1 partitions. + class Partition { + public: + Partition(RuntimeState* state, PhjBuilder* parent, int level); + ~Partition(); + + /// Close the partition and attach resources to 'batch' if non-NULL or free the + /// resources if 'batch' is NULL. Idempotent. + void Close(RowBatch* batch); + + /// Returns the estimated byte size of the in-memory data structures for this + /// partition. This includes all build rows and the hash table. + int64_t EstimatedInMemSize() const; + + /// Pins the build tuples for this partition and constructs the hash table from it. + /// Build rows cannot be added after calling this. If the build rows could not be + /// pinned or the hash table could not be built due to memory pressure, sets *built + /// to false and returns OK. Returns an error status if any other error is + /// encountered. + Status BuildHashTable(bool* built); + + /// Spills this partition, the partition's stream is unpinned with 'mode' and + /// its hash table is destroyed if it was built. + Status Spill(BufferedTupleStream::UnpinMode mode); + + bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; } + BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); } + HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); } + bool ALWAYS_INLINE is_spilled() const { return is_spilled_; } + int ALWAYS_INLINE level() const { return level_; } + + private: + /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array + /// containing the index of each row's index into the hash table's tuple stream. + /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash + /// table buckets which the rows hashes to will be prefetched. This parameter is + /// replaced with a constant during codegen time. This function may be replaced with + /// a codegen'd version. Returns true if all rows in 'batch' are successfully + /// inserted. + bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx, + RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices); + + const PhjBuilder* parent_; + + /// True if this partition is spilled. + bool is_spilled_; + + /// How many times rows in this partition have been repartitioned. Partitions created + /// from the node's children's input is level 0, 1 after the first repartitioning, + /// etc. + const int level_; + + /// The hash table for this partition. + boost::scoped_ptr<HashTable> hash_tbl_; + + /// Stream of build tuples in this partition. Initially owned by this object but + /// transferred to the parent exec node (via the row batch) when the partition + /// is closed. If NULL, ownership has been transferred and the partition is closed. + std::unique_ptr<BufferedTupleStream> build_rows_; + }; + + private: + /// Computes the minimum number of buffers required to execute the spilling partitioned + /// hash algorithm successfully for any input size (assuming enough disk space is + /// available for spilled rows). The buffers are used for buffering both build and + /// probe rows at different times, so the total requirement is the peak sum of build + /// and probe buffers required. + /// We need one output buffer per partition to partition the build or probe side. We + /// need one additional buffer for the input while repartitioning the build or probe. + /// For NAAJ, we need 3 additional buffers for 'null_aware_partition_', + /// 'null_aware_probe_partition_' and 'null_probe_rows_'. + int MinRequiredBuffers() const { + int num_reserved_buffers = PARTITION_FANOUT + 1; + if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) num_reserved_buffers += 3; + return num_reserved_buffers; + } + + /// Create and initialize a set of hash partitions for partitioning level 'level'. + /// The previous hash partitions must have been cleared with ClearHashPartitions(). + /// After calling this, batches are added to the new partitions by calling Send(). + Status CreateHashPartitions(int level); + + /// Create a new partition in 'all_partitions_' and prepare it for writing. + Status CreateAndPreparePartition(int level, Partition** partition); + + /// Reads the rows in build_batch and partitions them into hash_partitions_. If + /// 'build_filters' is true, runtime filters are populated. + Status ProcessBuildBatch(RowBatch* build_batch, HashTableCtx* ctx, bool build_filters); + + /// Append 'row' to 'stream'. In the common case, appending the row to the stream + /// immediately succeeds. Otherwise this function falls back to the slower path of + /// AppendRowStreamFull(), which may spill partitions to free memory. Returns an error + /// if it was unable to append the row, even after spilling partitions. + Status AppendRow(BufferedTupleStream* stream, TupleRow* row); + + /// Slow path for AppendRow() above. It is called when the stream has failed to append + /// the row. We need to find more memory by either switching to IO-buffers, in case the + /// stream still uses small buffers, or spilling a partition. + Status AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row); + + /// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed + /// to the Spill() call for the selected partition. The current policy is to spill the + /// largest partition. Returns non-ok status if we couldn't spill a partition. + Status SpillPartition(BufferedTupleStream::UnpinMode mode); + + /// Tries to build hash tables for all unspilled hash partitions. Called after + /// FlushFinal() when all build rows have been partitioned and added to the appropriate + /// streams. If the hash table could not be built for a partition, the partition is + /// spilled (with all build blocks unpinned) and the probe stream is prepared for + /// writing (i.e. has an initial probe buffer allocated). + /// + /// When this function returns successfully, each partition is in one of these states: + /// 1. closed. No probe partition is created and the build partition is closed. + /// 2. in-memory. The build rows are pinned and has a hash table built. No probe + /// partition is created. + /// 3. spilled. The build rows are fully unpinned and the probe stream is prepared. + Status BuildHashTablesAndPrepareProbeStreams(); + + /// Ensures that 'spilled_partition_probe_streams_' has a stream per spilled partition + /// in 'hash_partitions_'. May spill additional partitions until it can create enough + /// probe streams with write buffers. Returns an error if an error is encountered or + /// if it runs out of partitions to spill. + Status InitSpilledPartitionProbeStreams(); + + /// Calls Close() on every Partition, deletes them, and cleans up any pointers that + /// may reference them. Also cleans up 'spilled_partition_probe_streams_'. + void CloseAndDeletePartitions(); + + /// For each filter in filters_, allocate a bloom_filter from the fragment-local + /// RuntimeFilterBank and store it in runtime_filters_ to populate during the build + /// phase. + void AllocateRuntimeFilters(); + + /// Publish the runtime filters to the fragment-local RuntimeFilterBank. + /// 'num_build_rows' is used to determine whether the computed filters have an + /// unacceptably high false-positive rate. + void PublishRuntimeFilters(int64_t num_build_rows); + + /// Does all codegen for the builder (if codegen is enabled). + /// Updates the the builder's runtime profile with info about whether the codegen was + /// enabled and whether any errors occured during codegen. + void Codegen(RuntimeState* state); + + /// Codegen processing build batches. Identical signature to ProcessBuildBatch(). + /// Returns non-OK status if codegen was not possible. + Status CodegenProcessBuildBatch(llvm::Function* hash_fn, llvm::Function* murmur_hash_fn, + llvm::Function* eval_row_fn); + + /// Codegen inserting batches into a partition's hash table. Identical signature to + /// Partition::InsertBatch(). Returns non-OK if codegen was not possible. + Status CodegenInsertBatch(llvm::Function* hash_fn, llvm::Function* murmur_hash_fn, + llvm::Function* eval_row_fn); + + RuntimeState* const runtime_state_; + + // The ID of the plan join node this is associated with. + // TODO: we may want to replace this with a sink ID once we progress further with + // multithreading. + const int join_node_id_; + + /// The join operation this is building for. + const TJoinOp::type join_op_; + + /// Descriptor for the probe rows, needed to initialize probe streams. + const RowDescriptor& probe_row_desc_; + + /// Pool for objects with same lifetime as builder. + ObjectPool pool_; + + /// Client to the buffered block mgr, used to allocate build partition buffers and hash + /// tables. When probing, the spilling algorithm keeps some build partitions in memory + /// while using memory for probe buffers for spilled partitions. To support dynamically + /// dividing memory between build and probe, this client is owned by the builder but + /// shared with the PartitionedHashJoinNode. + /// TODO: this approach to sharing will not work for spilling broadcast joins with a + /// 1:N relationship from builders to join nodes. + BufferedBlockMgr::Client* block_mgr_client_; + + /// If true, the build side has at least one row. + bool non_empty_build_; + + /// Expr contexts to free after partitioning or inserting each batch. + std::vector<ExprContext*> expr_ctxs_to_free_; + + /// Expression contexts over input rows for hash table build. + std::vector<ExprContext*> build_expr_ctxs_; + + /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS + /// NOT DISTINCT FROM, rather than equality. + std::vector<bool> is_not_distinct_from_; + + /// List of filters to build. + std::vector<FilterContext> filters_; + + /// Used for hash-related functionality, such as evaluating rows and calculating hashes. + /// The level is set to the same level as 'hash_partitions_'. + boost::scoped_ptr<HashTableCtx> ht_ctx_; + + /// Total number of partitions created. + RuntimeProfile::Counter* partitions_created_; + + /// The largest fraction (of build side) after repartitioning. This is expected to be + /// 1 / PARTITION_FANOUT. A value much larger indicates skew. + RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_; + + /// Level of max partition (i.e. number of repartitioning steps). + RuntimeProfile::HighWaterMarkCounter* max_partition_level_; + + /// Number of build rows that have been partitioned. + RuntimeProfile::Counter* num_build_rows_partitioned_; + + /// Number of hash collisions - unequal rows that have identical hash values + RuntimeProfile::Counter* num_hash_collisions_; + + /// Total number of hash buckets across all partitions. + RuntimeProfile::Counter* num_hash_buckets_; + + /// Number of partitions that have been spilled. + RuntimeProfile::Counter* num_spilled_partitions_; + + /// Number of partitions that have been repartitioned. + RuntimeProfile::Counter* num_repartitions_; + + /// Time spent partitioning build rows. + RuntimeProfile::Counter* partition_build_rows_timer_; + + /// Time spent building hash tables. + RuntimeProfile::Counter* build_hash_table_timer_; + + /// Time spent repartitioning and building hash tables of any resulting partitions + /// that were not spilled. + RuntimeProfile::Counter* repartition_timer_; + + ///////////////////////////////////////// + /// BEGIN: Members that must be Reset() + + /// Vector that owns all of the Partition objects. + std::vector<std::unique_ptr<Partition>> all_partitions_; + + /// The current set of partitions that are being built or probed. This vector is + /// initialized before partitioning or re-partitioning the build input + /// and cleared after we've finished probing the partitions. + /// This is not used when processing a single spilled partition. + std::vector<Partition*> hash_partitions_; + + /// Partition used for null-aware joins. This partition is always processed at the end + /// after all build and probe rows are processed. In this partition's 'build_rows_', we + /// store all the rows for which 'build_expr_ctxs_' evaluated over the row returns NULL + /// (i.e. it has a NULL on the eq join slot). + /// NULL if the join is not null aware or we are done processing this partition. + Partition* null_aware_partition_; + + /// Populated during the hash table building phase if any partitions spilled. + /// One probe stream per spilled partition is prepared for writing so that the + /// initial write buffer is allocated. + /// + /// These streams are handed off to PartitionedHashJoinNode for use in buffering + /// spilled probe rows. The allocation is done in the builder so that it can divide + /// memory between the in-memory build partitions and write buffers based on the size + /// of the partitions and available memory. E.g. if all the partitions fit in memory, no + /// write buffers need to be allocated, but if some partitions are spilled, more build + /// partitions may be spilled to free up memory for write buffers. + /// + /// Because of this, at the end of the build phase, we always have sufficient memory + /// to execute the probe phase of the algorithm without spilling more partitions. + std::vector<std::unique_ptr<BufferedTupleStream>> spilled_partition_probe_streams_; + + /// END: Members that must be Reset() + ///////////////////////////////////////// + + /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available + /// and is used when the partition level is 0, otherwise xxx_fn_ uses murmur hash and is + /// used for subsequent levels. + typedef Status (*ProcessBuildBatchFn)( + PhjBuilder*, RowBatch*, HashTableCtx*, bool build_filters); + /// Jitted ProcessBuildBatch function pointers. NULL if codegen is disabled. + ProcessBuildBatchFn process_build_batch_fn_; + ProcessBuildBatchFn process_build_batch_fn_level0_; + + typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*, RowBatch*, + const std::vector<BufferedTupleStream::RowIdx>&); + /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled. + InsertBatchFn insert_batch_fn_; + InsertBatchFn insert_batch_fn_level0_; +}; +} + +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc index bab1bf8..44cb14b 100644 --- a/be/src/exec/partitioned-hash-join-node-ir.cc +++ b/be/src/exec/partitioned-hash-join-node-ir.cc @@ -19,10 +19,7 @@ #include "codegen/impala-ir.h" #include "exec/hash-table.inline.h" -#include "runtime/raw-value.inline.h" #include "runtime/row-batch.h" -#include "runtime/runtime-filter.h" -#include "util/bloom-filter.h" #include "common/names.h" @@ -151,12 +148,11 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins( // should interpret the hash table probe as "unknown" if there are nulls on the // build side. For those rows, we need to process the remaining join // predicates later. - if (null_aware_partition_->build_rows()->num_rows() != 0) { - if (num_other_join_conjuncts > 0 && - UNLIKELY(!AppendRow(null_aware_partition_->probe_rows(), - current_probe_row_, status))) { - DCHECK(!status->ok()); - return false; + if (builder_->null_aware_partition()->build_rows()->num_rows() != 0) { + if (num_other_join_conjuncts > 0) { + *status = AppendProbeRow( + null_aware_probe_partition_->probe_rows(), current_probe_row_); + if (UNLIKELY(!status->ok())) return false; } return true; } @@ -271,7 +267,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow( // Fetch the hash and expr values' nullness for this row. if (expr_vals_cache->IsRowNull()) { - if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && non_empty_build_) { + if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && builder_->non_empty_build()) { const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size(); // For NAAJ, we need to treat NULLs on the probe carefully. The logic is: // 1. No build rows -> Return this row. The check for 'non_empty_build_' @@ -284,14 +280,12 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow( if (num_other_join_conjuncts == 0) { // Condition 2 above. skip_row = true; - } else if (LIKELY(AppendRow(null_probe_rows_, current_probe_row_, status))) { + } else { // Condition 3 above. + *status = AppendProbeRow(null_probe_rows_.get(), current_probe_row_); + if (UNLIKELY(!status->ok())) return false; matched_null_probe_.push_back(false); skip_row = true; - } else { - // Condition 3 above but failed to append to 'null_probe_rows_'. Bail out. - DCHECK(!status->ok()); - return false; } } } else { @@ -300,20 +294,20 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow( hash_tbl_iterator_ = hash_tbl->FindProbeRow(ht_ctx); } else { // The build partition is either empty or spilled. - Partition* partition = hash_partitions_[partition_idx]; - // This partition is closed, meaning the build side for this partition was empty. - if (UNLIKELY(partition->is_closed())) { - DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING); + PhjBuilder::Partition* build_partition = builder_->hash_partition(partition_idx); + ProbePartition* probe_partition = probe_hash_partitions_[partition_idx].get(); + DCHECK((build_partition->IsClosed() && probe_partition == NULL) + || (build_partition->is_spilled() && probe_partition != NULL)); + + if (UNLIKELY(probe_partition == NULL)) { + // A closed partition implies that the build side for this partition was empty. } else { - // This partition is not in memory, spill the probe row and move to the next row. - DCHECK(partition->is_spilled()); - DCHECK(partition->probe_rows() != NULL); + // The partition is not in memory, spill the probe row and move to the next row. // Skip the current row if we manage to append to the spilled partition's BTS. // Otherwise, we need to bail out and report the failure. - if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) { - DCHECK(!status->ok()); - return false; - } + BufferedTupleStream* probe_rows = probe_partition->probe_rows(); + *status = AppendProbeRow(probe_rows, current_probe_row_); + if (UNLIKELY(!status->ok())) return false; skip_row = true; } } @@ -358,9 +352,11 @@ void IR_ALWAYS_INLINE PartitionedHashJoinNode::EvalAndHashProbePrefetchGroup( } // CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by codegen. -template<int const JoinOp> +template <int const JoinOp> int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) { + DCHECK(state_ == PARTITIONING_PROBE || state_ == PROBING_SPILLED_PARTITION + || state_ == REPARTITIONING_PROBE); ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0]; const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size(); ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; @@ -430,82 +426,15 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode return num_rows_added; } -Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch, - bool build_filters) { - HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx_->expr_values_cache(); - expr_vals_cache->Reset(); - FOREACH_ROW(build_batch, 0, build_batch_iter) { - DCHECK(build_status_.ok()); - TupleRow* build_row = build_batch_iter.Get(); - if (!ht_ctx_->EvalAndHashBuild(build_row)) { - if (null_aware_partition_ != NULL) { - // TODO: remove with codegen/template - // If we are NULL aware and this build row has NULL in the eq join slot, - // append it to the null_aware partition. We will need it later. - if (UNLIKELY(!AppendRow(null_aware_partition_->build_rows(), - build_row, &build_status_))) { - return build_status_; - } - } - continue; - } - if (build_filters) { - DCHECK_EQ(ht_ctx_->level(), 0) - << "Runtime filters should not be built during repartitioning."; - for (const FilterContext& ctx: filters_) { - // TODO: codegen expr evaluation and hashing - if (ctx.local_bloom_filter == NULL) continue; - void* e = ctx.expr->GetValue(build_row); - uint32_t filter_hash = RawValue::GetHashValue(e, ctx.expr->root()->type(), - RuntimeFilterBank::DefaultHashSeed()); - ctx.local_bloom_filter->Insert(filter_hash); - } - } - const uint32_t hash = expr_vals_cache->CurExprValuesHash(); - const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); - Partition* partition = hash_partitions_[partition_idx]; - const bool result = AppendRow(partition->build_rows(), build_row, &build_status_); - if (UNLIKELY(!result)) return build_status_; - } - return Status::OK(); -} - -bool PartitionedHashJoinNode::Partition::InsertBatch( - TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx, RowBatch* batch, - const vector<BufferedTupleStream::RowIdx>& indices) { - // Compute the hash values and prefetch the hash table buckets. - const int num_rows = batch->num_rows(); - HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache(); - const int prefetch_size = expr_vals_cache->capacity(); - const BufferedTupleStream::RowIdx* row_indices = indices.data(); - for (int prefetch_group_row = 0; prefetch_group_row < num_rows; - prefetch_group_row += prefetch_size) { - int cur_row = prefetch_group_row; - expr_vals_cache->Reset(); - FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) { - if (ht_ctx->EvalAndHashBuild(batch_iter.Get())) { - if (prefetch_mode != TPrefetchMode::NONE) { - hash_tbl_->PrefetchBucket<false>(expr_vals_cache->CurExprValuesHash()); - } - } else { - expr_vals_cache->SetRowNull(); - } - expr_vals_cache->NextRow(); - } - // Do the insertion. - expr_vals_cache->ResetForRead(); - FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) { - TupleRow* row = batch_iter.Get(); - BufferedTupleStream::RowIdx row_idx = row_indices[cur_row]; - if (!expr_vals_cache->IsRowNull() && - UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx, row))) { - return false; - } - expr_vals_cache->NextRow(); - ++cur_row; - } - } - return true; +inline Status PartitionedHashJoinNode::AppendProbeRow( + BufferedTupleStream* stream, TupleRow* row) { + DCHECK(stream->has_write_block()); + DCHECK(!stream->using_small_buffers()); + DCHECK(!stream->is_pinned()); + Status status; + if (LIKELY(stream->AddRow(row, &status))) return Status::OK(); + DCHECK(!status.ok()); + return status; } template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::INNER_JOIN>(
