Repository: incubator-impala Updated Branches: refs/heads/master b7d107a69 -> 241c7e019
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index 36dfce6..9827788 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -20,46 +20,88 @@ #define IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H #include <boost/scoped_ptr.hpp> -#include <boost/unordered_set.hpp> #include <boost/thread.hpp> +#include <list> +#include <memory> #include <string> #include "exec/blocking-join-node.h" #include "exec/exec-node.h" -#include "exec/filter-context.h" -#include "exec/hash-table.h" +#include "exec/partitioned-hash-join-builder.h" #include "runtime/buffered-block-mgr.h" -#include "gen-cpp/PlanNodes_types.h" // for TJoinOp +#include "gen-cpp/Types_types.h" namespace impala { class BloomFilter; class BufferedBlockMgr; +class BufferedTupleStream; class MemPool; class RowBatch; class RuntimeFilter; class TupleRow; -class BufferedTupleStream; -/// Operator to perform partitioned hash join, spilling to disk as necessary. -/// A spilled partition is one that is not fully pinned. -/// The operator runs in these distinct phases: -/// 1. Consume all build input and partition them. No hash tables are maintained. -/// 2. Construct hash tables from as many partitions as possible. -/// 3. Consume all the probe rows. Rows belonging to partitions that are spilled -/// must be spilled as well. -/// 4. Iterate over the spilled partitions, construct the hash table from the spilled -/// build rows and process the spilled probe rows. If the partition is still too -/// big, repeat steps 1-4, using this spilled partitions build and probe rows as -/// input. -// +/// Operator to perform partitioned hash join, spilling to disk as necessary. This +/// operator implements multiple join modes with the same code algorithm. +/// +/// The high-level algorithm is as follows: +/// 1. Consume all build input and partition it. No hash tables are maintained. +/// 2. Construct hash tables for as many unspilled partitions as possible. +/// 3. Consume the probe input. Each probe row is hashed to find the corresponding build +/// partition. If the build partition is in-memory (i.e. not spilled), then the +/// partition's hash table is probed and any matching rows can be outputted. If the +/// build partition is spilled, the probe row must also be spilled for later +/// processing. +/// 4. Any spilled partitions are processed. If the build rows and hash table for a +/// spilled partition fit in memory, the spilled partition is brought into memory +/// and its spilled probe rows are processed. Otherwise the spilled partition must be +/// repartitioned into smaller partitions. Repartitioning repeats steps 1-3 above, +/// except with the partition's spilled build and probe rows as input. +/// +/// IMPLEMENTATION DETAILS: +/// ----------------------- +/// The partitioned hash join algorithm is implemented with the PartitionedHashJoinNode +/// and PhjBuilder classes. Each join node has a builder (see PhjBuilder) that +/// partitions, stores and builds hash tables over the build rows. +/// +/// The above algorithm is implemented as a state machine with the following phases: +/// +/// 1. [PARTITIONING_BUILD or REPARTITIONING_BUILD] Read build rows from child(1) OR +/// from the spilled build rows of a partition and partition them into the builder's +/// hash partitions. If there is sufficient memory, all build partitions are kept +/// in memory. Otherwise, build partitions are spilled as needed to free up memory. +/// Finally, build a hash table for each in-memory partition and create a probe +/// partition with a write buffer for each spilled partition. +/// +/// After the phase, the algorithm advances from PARTITIONING_BUILD to +/// PARTITIONING_PROBE or from REPARTITIONING_BUILD to REPARTITIONING_PROBE. +/// +/// 2. [PARTITIONING_PROBE or REPARTITIONING_PROBE] Read the probe rows from child(0) or +/// a the spilled probe rows of a partition and partition them. If a probe row's +/// partition is in memory, probe the partition's hash table, otherwise spill the +/// probe row. Finally, output unmatched build rows for join modes that require it. +/// +/// After the phase, the algorithm terminates if no spilled partitions remain or +/// continues to process one of the remaining spilled partitions by advancing to +/// either PROBING_SPILLED_PARTITION or REPARTITIONING_BUILD, depending on whether +/// the spilled partition's hash table fits in memory or not. +/// +/// 3. [PROBING_SPILLED_PARTITION] Read the probe rows from a spilled partition that +/// was brought back into memory and probe the partition's hash table. Finally, +/// output unmatched build rows for join modes that require it. +/// +/// After the phase, the algorithm terminates if no spilled partitions remain or +/// continues to process one of the remaining spilled partitions by advancing to +/// either PROBING_SPILLED_PARTITION or REPARTITIONING_BUILD, depending on whether +/// the spilled partition's hash table fits in memory or not. +/// +/// Null aware anti-join (NAAJ) extends the above algorithm by accumulating rows with +/// NULLs into several different streams, which are processed in a separate step to +/// produce additional output rows. The NAAJ algorithm is documented in more detail in +/// header comments for the null aware functions and data structures. +/// /// TODO: don't copy tuple rows so often. -/// TODO: we need multiple hash functions. Each repartition needs new hash functions -/// or new bits. Multiplicative hashing? -/// TODO: think about details about multithreading. Multiple partitions in parallel? -/// Multiple threads against a single partition? How to build hash tables in parallel? -/// TODO: BuildHashTables() should start with the partitions that are already pinned. class PartitionedHashJoinNode : public BlockingJoinNode { public: PartitionedHashJoinNode(ObjectPool* pool, const TPlanNode& tnode, @@ -78,102 +120,53 @@ class PartitionedHashJoinNode : public BlockingJoinNode { virtual Status ProcessBuildInput(RuntimeState* state); private: - class Partition; - - /// Implementation details: - /// Logically, the algorithm runs in three modes. - /// 1. [PARTITIONING_BUILD or REPARTITIONING] Read the build side rows and partition - /// them into hash_partitions_. This is a fixed fan out of the input. The input - /// can either come from child(1) OR from the build tuple stream of partition - /// that needs to be repartitioned. - /// 2. [PROCESSING_PROBE or REPARTITIONING] Read the probe side rows, partition them - /// and either perform the join or spill them into hash_partitions_. If the - /// partition has the hash table in memory, we perform the join, otherwise we - /// spill the probe row. Similar to step one, the rows can come from child(0) or - /// a spilled partition. - /// 3. [PROBING_SPILLED_PARTITION] Read and construct a single spilled partition. - /// In this case we are walking a spilled partition and the hash table fits in - /// memory. Neither the build nor probe side need to be partitioned and we just - /// perform the join. - /// - /// States: - /// The transition goes from PARTITIONING_BUILD -> PROCESSING_PROBE -> - /// PROBING_SPILLED_PARTITION/REPARTITIONING. - /// The last two steps will switch back and forth as many times as we need to - /// repartition. + class ProbePartition; + enum HashJoinState { - /// Partitioning the build (right) child's input. Corresponds to mode 1 above but - /// only when consuming from child(1). + /// Partitioning the build (right) child's input into the builder's hash partitions. PARTITIONING_BUILD, - /// Processing the probe (left) child's input. Corresponds to mode 2 above but - /// only when consuming from child(0). - PROCESSING_PROBE, + /// Processing the probe (left) child's input, probing hash tables and + /// spilling probe rows into 'probe_hash_partitions_' if necessary. + PARTITIONING_PROBE, - /// Probing a spilled partition. The hash table for this partition fits in memory. - /// Corresponds to mode 3. + /// Processing the spilled probe rows of a single spilled partition + /// ('input_partition_') that fits in memory. PROBING_SPILLED_PARTITION, - /// Repartitioning a single spilled partition (input_partition_) into - /// hash_partitions_. - /// Corresponds to mode 1 & 2 but reading from a spilled partition. - REPARTITIONING, + /// Repartitioning the build rows of a single spilled partition ('input_partition_') + /// into the builder's hash partitions. + /// Corresponds to PARTITIONING_BUILD but reading from a spilled partition. + REPARTITIONING_BUILD, + + /// Probing the repartitioned hash partitions of a single spilled partition + /// ('input_partition_') with the probe rows of that partition. + /// Corresponds to PARTITIONING_PROBE but reading from a spilled partition. + REPARTITIONING_PROBE, }; - /// Number of initial partitions to create. Must be a power of two. - /// TODO: this is set to a lower than actual value for testing. - static const int PARTITION_FANOUT = 16; - - /// Needs to be the log(PARTITION_FANOUT) - static const int NUM_PARTITIONING_BITS = 4; - - /// Maximum number of times we will repartition. The maximum build table we - /// can process is: - /// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). With a (low) 1GB - /// limit and 64 fanout, we can support 256TB build tables in the case where - /// there is no skew. - /// In the case where there is skew, repartitioning is unlikely to help (assuming a - /// reasonable hash function). - /// Note that we need to have at least as many SEED_PRIMES in HashTableCtx. - /// TODO: we can revisit and try harder to explicitly detect skew. - static const int MAX_PARTITION_DEPTH = 16; - - /// Append the row to stream. In the common case, the row is just in memory and the - /// append succeeds. If the append fails, we fallback to the slower path of - /// AppendRowStreamFull(). - /// Returns true if the row was added and false otherwise. If false is returned, - /// *status contains the error (doesn't return status because this is very perf - /// sensitive). - bool AppendRow(BufferedTupleStream* stream, TupleRow* row, Status* status); - - /// Slow path for AppendRow() above. It is called when the stream has failed to append - /// the row. We need to find more memory by either switching to IO-buffers, in case the - /// stream still uses small buffers, or spilling a partition. - bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row, Status* status); - - /// Called when we need to free up memory by spilling a partition. - /// This function walks hash_partitions_ and picks one to spill. - /// *spilled_partition is the partition that was spilled. - /// Returns non-ok status if we couldn't spill a partition. - Status SpillPartition(Partition** spilled_partition); - - /// Partitions the entire build input (either from child(1) or input_partition_) into - /// hash_partitions_. When this call returns, hash_partitions_ is ready to consume - /// the probe input. - /// 'level' is the level new partitions (in hash_partitions_) should be created with. - Status ProcessBuildInput(RuntimeState* state, int level); - - /// Reads the rows in build_batch and partitions them in hash_partitions_. If - /// 'build_filters' is true, runtime filters are populated. - Status ProcessBuildBatch(RowBatch* build_batch, bool build_filters); - - /// Call at the end of partitioning the build rows (which could be from the build child - /// or from repartitioning an existing partition). After this function returns, all - /// partitions in hash_partitions_ are ready to accept probe rows. This function - /// constructs hash tables for as many partitions as fit in memory (which can be none). - /// For the remaining partitions, this function initializes the probe spilling - /// structures. - Status BuildHashTables(RuntimeState* state); + /// Constants from PhjBuilder, added to this node for convenience. + static const int PARTITION_FANOUT = PhjBuilder::PARTITION_FANOUT; + static const int NUM_PARTITIONING_BITS = PhjBuilder::NUM_PARTITIONING_BITS; + static const int MAX_PARTITION_DEPTH = PhjBuilder::MAX_PARTITION_DEPTH; + + /// Initialize 'probe_hash_partitions_' and 'hash_tbls_' before probing. One probe + /// partition is created per spilled build partition, and 'hash_tbls_' is initialized + /// with pointers to the hash tables of in-memory partitions and NULL pointers for + /// spilled or closed partitions. + /// Called after the builder has partitioned the build rows and built hash tables, + /// either in the initial build step, or after repartitioning a spilled partition. + /// After this function returns, all partitions are ready to process probe rows. + Status PrepareForProbe(); + + /// Creates an initialized probe partition at 'partition_idx' in + /// 'probe_hash_partitions_'. + void CreateProbePartition( + int partition_idx, std::unique_ptr<BufferedTupleStream> probe_rows); + + /// Append the probe row 'row' to 'stream'. The stream must be unpinned and must have + /// a write buffer allocated, so this will succeed unless an error is encountered. + Status AppendProbeRow(BufferedTupleStream* stream, TupleRow* row); /// Probes the hash table for rows matching the current probe row and appends /// all the matching build rows (with probe row) to output batch. Returns true @@ -292,6 +285,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// new partition at the front of output_build_partitions_. void OutputUnmatchedBuild(RowBatch* out_batch); + /// Initializes 'null_aware_probe_partition_' and prepares its probe stream for writing. + Status InitNullAwareProbePartition(); + + /// Initializes 'null_probe_rows_' and prepares that stream for writing. + Status InitNullProbeRows(); + /// Initializes null_aware_partition_ and nulls_build_batch_ to output rows. Status PrepareNullAwarePartition(); @@ -313,155 +312,75 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// false. Used for NAAJ. Status OutputNullAwareNullProbe(RuntimeState* state, RowBatch* out_batch); - /// Call at the end of consuming the probe rows. Walks hash_partitions_ and - /// - If this partition had a hash table, close it. This partition is fully processed - /// on both the build and probe sides. The streams are transferred to batch. + /// Call at the end of consuming the probe rows. Cleans up the build and probe hash + /// partitions and: + /// - If the build partition had a hash table, close it. The build and probe + /// partitions are fully processed. The streams are transferred to 'batch'. /// In the case of right-outer and full-outer joins, instead of closing this /// partition we put it on a list of partitions for which we need to flush their /// unmatched rows. - /// - If this partition did not have a hash table, meaning both sides were spilled, - /// move the partition to spilled_partitions_. + /// - If the build partition did not have a hash table, meaning both build and probe + /// rows were spilled, move the partition to 'spilled_partitions_'. Status CleanUpHashPartitions(RowBatch* batch); /// Get the next row batch from the probe (left) side (child(0)). If we are done - /// consuming the input, sets probe_batch_pos_ to -1, otherwise, sets it to 0. - Status NextProbeRowBatch(RuntimeState*, RowBatch* out_batch); + /// consuming the input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0. + Status NextProbeRowBatch(RuntimeState* state, RowBatch* out_batch); - /// Get the next probe row batch from input_partition_. If we are done consuming the - /// input, sets probe_batch_pos_ to -1, otherwise, sets it to 0. - Status NextSpilledProbeRowBatch(RuntimeState*, RowBatch* out_batch); + /// Get the next probe row batch from 'input_partition_'. If we are done consuming the + /// input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0. + Status NextSpilledProbeRowBatch(RuntimeState* state, RowBatch* out_batch); - /// Moves onto the next spilled partition and initializes input_partition_. This - /// function processes the entire build side of input_partition_ and when this function - /// returns, we are ready to consume the probe side of input_partition_. + /// Moves onto the next spilled partition and initializes 'input_partition_'. This + /// function processes the entire build side of 'input_partition_' and when this + /// function returns, we are ready to consume the probe side of 'input_partition_'. /// If the build side's hash table fits in memory, we will construct input_partition_'s /// hash table. If it does not, meaning we need to repartition, this function will - /// initialize hash_partitions_. - Status PrepareNextPartition(RuntimeState*); - - /// Iterates over all the partitions in hash_partitions_ and returns the number of rows - /// of the largest partition (in terms of number of build and probe rows). - int64_t LargestSpilledPartition() const; + /// repartition the build rows into 'builder->hash_partitions_' and prepare for + /// repartitioning the partition's probe rows. + Status PrepareSpilledPartitionForProbe(RuntimeState* state, bool* got_partition); - /// Calls Close() on every Partition in 'hash_partitions_', - /// 'spilled_partitions_', and 'output_build_partitions_' and then resets the lists, - /// the vector and the partition pool. - void ClosePartitions(); + /// Calls Close() on every probe partition, destroys the partitions and cleans up any + /// references to the partitions. Also closes and destroys 'null_probe_rows_'. + void CloseAndDeletePartitions(); /// Prepares for probing the next batch. void ResetForProbe(); - /// For each filter in filters_, allocate a bloom_filter from the fragment-local - /// RuntimeFilterBank and store it in runtime_filters_ to populate during the build - /// phase. - void AllocateRuntimeFilters(RuntimeState* state); - - /// Publish the runtime filters to the fragment-local - /// RuntimeFilterBank. 'total_build_rows' is used to determine whether the computed - /// filters have an unacceptably high false-positive rate. - void PublishRuntimeFilters(RuntimeState* state, int64_t total_build_rows); - /// Codegen function to create output row. Assumes that the probe row is non-NULL. Status CodegenCreateOutputRow(LlvmCodeGen* codegen, llvm::Function** fn); - /// Codegen processing build batches. Identical signature to ProcessBuildBatch. - /// Returns non-OK status if codegen was not possible. - Status CodegenProcessBuildBatch(RuntimeState* state, llvm::Function* hash_fn, - llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn); - /// Codegen processing probe batches. Identical signature to ProcessProbeBatch. /// Returns non-OK if codegen was not possible. - Status CodegenProcessProbeBatch( - RuntimeState* state, llvm::Function* hash_fn, llvm::Function* murmur_hash_fn); - - /// Codegen inserting batches into a partition's hash table. Identical signature to - /// Partition::InsertBatch(). Returns non-OK if codegen was not possible. - Status CodegenInsertBatch(RuntimeState* state, llvm::Function* hash_fn, - llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn); + Status CodegenProcessProbeBatch(RuntimeState* state); /// Returns the current state of the partition as a string. std::string PrintState() const; - /// Updates state_ to 's', logging the transition. - void UpdateState(HashJoinState s); + /// Updates 'state_' to 'next_state', logging the transition. + void UpdateState(HashJoinState next_state); std::string NodeDebugString() const; - /// We need two output buffers per partition (one for build and one for probe) and - /// and two additional buffers for the input (while repartitioning; for the build and - /// probe sides). - /// For NAAJ, we need 3 additional buffers to maintain the null_aware_partition_. - int MinRequiredBuffers() const { - int num_reserved_buffers = PARTITION_FANOUT * 2 + 2; - num_reserved_buffers += join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ? 3 : 0; - return num_reserved_buffers; - } - RuntimeState* runtime_state_; /// Our equi-join predicates "<lhs> = <rhs>" are separated into /// build_expr_ctxs_ (over child(1)) and probe_expr_ctxs_ (over child(0)) - std::vector<ExprContext*> probe_expr_ctxs_; std::vector<ExprContext*> build_expr_ctxs_; - - /// List of filters to build during build phase. - std::vector<FilterContext> filters_; - - /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS - /// NOT DISTINCT FROM, rather than equality. - std::vector<bool> is_not_distinct_from_; + std::vector<ExprContext*> probe_expr_ctxs_; /// Non-equi-join conjuncts from the ON clause. std::vector<ExprContext*> other_join_conjunct_ctxs_; - /// Codegen doesn't allow for automatic Status variables because then exception - /// handling code is needed to destruct the Status, and our function call substitution - /// doesn't know how to deal with the LLVM IR 'invoke' instruction. Workaround that by - /// placing the build-side status here so exceptions won't need to destruct it. - /// This status should used directly only by ProcesssBuildBatch(). - /// TODO: fix IMPALA-1948 and remove this. - Status build_status_; - - /// Client to the buffered block mgr. - BufferedBlockMgr::Client* block_mgr_client_; - /// Used for hash-related functionality, such as evaluating rows and calculating hashes. - /// TODO: If we want to multi-thread then this context should be thread-local and not - /// associated with the node. boost::scoped_ptr<HashTableCtx> ht_ctx_; /// The iterator that corresponds to the look up of current_probe_row_. HashTable::Iterator hash_tbl_iterator_; - /// Total time spent partitioning build. - RuntimeProfile::Counter* partition_build_timer_; - - /// Total number of hash buckets across all partitions. - RuntimeProfile::Counter* num_hash_buckets_; - - /// Total number of partitions created. - RuntimeProfile::Counter* partitions_created_; - - /// Level of max partition (i.e. number of repartitioning steps). - RuntimeProfile::HighWaterMarkCounter* max_partition_level_; - - /// Number of build/probe rows that have been partitioned. - RuntimeProfile::Counter* num_build_rows_partitioned_; + /// Number of probe rows that have been partitioned. RuntimeProfile::Counter* num_probe_rows_partitioned_; - /// Number of partitions that have been repartitioned. - RuntimeProfile::Counter* num_repartitions_; - - /// Number of partitions that have been spilled. - RuntimeProfile::Counter* num_spilled_partitions_; - - /// The largest fraction (of build side) after repartitioning. This is expected to be - /// 1 / PARTITION_FANOUT. A value much larger indicates skew. - RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_; - - /// Number of hash collisions - unequal rows that have identical hash values - RuntimeProfile::Counter* num_hash_collisions_; - /// Time spent evaluating other_join_conjuncts for NAAJ. RuntimeProfile::Counter* null_aware_eval_timer_; @@ -471,60 +390,52 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// State of the partitioned hash join algorithm. Used just for debugging. HashJoinState state_; - /// Object pool that holds the Partition objects in hash_partitions_. - boost::scoped_ptr<ObjectPool> partition_pool_; - - /// The current set of partitions that are being built. This is only used in - /// mode 1 and 2 when we need to partition the build and probe inputs. - /// This is not used when processing a single partition. - /// After CleanUpHashPartitions() is called this vector should be empty. - std::vector<Partition*> hash_partitions_; - - /// The list of partitions that have been spilled on both sides and still need more - /// processing. These partitions could need repartitioning, in which case more - /// partitions will be added to this list after repartitioning. - /// This list is populated at CleanUpHashPartitions(). - std::list<Partition*> spilled_partitions_; + /// The build-side of the join. Initialized in Init(). + boost::scoped_ptr<PhjBuilder> builder_; /// Cache of the per partition hash table to speed up ProcessProbeBatch. /// In the case where we need to partition the probe: - /// hash_tbls_[i] = hash_partitions_[i]->hash_tbl(); + /// hash_tbls_[i] = builder_->hash_partitions_[i]->hash_tbl(); /// In the case where we don't need to partition the probe: /// hash_tbls_[i] = input_partition_->hash_tbl(); HashTable* hash_tbls_[PARTITION_FANOUT]; - /// The current input partition to be processed (not in spilled_partitions_). - /// This partition can either serve as the source for a repartitioning step, or - /// if the hash table fits in memory, the source of the probe rows. - Partition* input_partition_; + /// Probe partitions, with indices corresponding to the build partitions in + /// builder_->hash_partitions(). This is non-empty only in the PARTITIONING_PROBE or + /// REPARTITIONING_PROBE states, in which case it has NULL entries for in-memory + /// build partitions and non-NULL entries for spilled build partitions (so that we + /// have somewhere to spill the probe rows for the spilled partition). + std::vector<std::unique_ptr<ProbePartition>> probe_hash_partitions_; + + /// The list of probe partitions that have been spilled and still need more + /// processing. These partitions could need repartitioning, in which case more + /// partitions will be added to this list after repartitioning. + /// This list is populated at CleanUpHashPartitions(). + std::list<std::unique_ptr<ProbePartition>> spilled_partitions_; + + /// The current spilled probe partition being processed as input to repartitioning, + /// or the source of the probe rows if the hash table fits in memory. + std::unique_ptr<ProbePartition> input_partition_; /// In the case of right-outer and full-outer joins, this is the list of the partitions /// for which we need to output their unmatched build rows. /// This list is populated at CleanUpHashPartitions(). - std::list<Partition*> output_build_partitions_; - - /// Partition used if null_aware_ is set. This partition is always processed at the end - /// after all build and probe rows are processed. Rows are added to this partition along - /// the way. - /// In this partition's build_rows_, we store all the rows for which build_expr_ctxs_ - /// evaluated over the row returns NULL (i.e. it has a NULL on the eq join slot). - /// In this partition's probe_rows, we store all probe rows that did not have a match - /// in the hash table. - /// At the very end, we then iterate over all the probe rows. For each probe row, we - /// return the rows that did not match any of the build rows. - /// NULL if we this join is not null aware or we are done processing this partition. - Partition* null_aware_partition_; + std::list<PhjBuilder::Partition*> output_build_partitions_; /// Used while processing null_aware_partition_. It contains all the build tuple rows /// with a NULL when evaluating the hash table expr. boost::scoped_ptr<RowBatch> nulls_build_batch_; - /// If true, the build side has at least one row. - bool non_empty_build_; + /// Partition used if 'null_aware_' is set. During probing, rows from the probe + /// side that did not have a match in the hash table are appended to this partition. + /// At the very end, we then iterate over the partition's probe rows. For each probe + /// row, we return the rows that did not match any of the partition's build rows. This + /// is NULL if this join is not null aware or we are done processing this partition. + boost::scoped_ptr<ProbePartition> null_aware_probe_partition_; /// For NAAJ, this stream contains all probe rows that had NULL on the hash table - /// conjuncts. - BufferedTupleStream* null_probe_rows_; + /// conjuncts. Must be unique_ptr so we can release it and transfer to output batches. + std::unique_ptr<BufferedTupleStream> null_probe_rows_; /// For each row in null_probe_rows_, true if this row has matched any build row /// (i.e. the resulting joined row passes other_join_conjuncts). @@ -538,99 +449,55 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// END: Members that must be Reset() ///////////////////////////////////////// - class Partition { + /// The probe-side partition corresponding to a build partition. The probe partition + /// is created when a build partition is spilled so that probe rows can be spilled to + /// disk for later processing. + class ProbePartition { public: - Partition(RuntimeState* state, PartitionedHashJoinNode* parent, int level); - ~Partition(); - - BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_; } - BufferedTupleStream* ALWAYS_INLINE probe_rows() { return probe_rows_; } - HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); } - - bool ALWAYS_INLINE is_closed() const { return is_closed_; } - bool ALWAYS_INLINE is_spilled() const { return is_spilled_; } - - /// Must be called once per partition to release any resources. This should be called - /// as soon as possible to release memory. - /// If batch is non-null, the build and probe streams are attached to the batch, - /// transferring ownership to them. + /// Create a new probe partition. 'probe_rows' should be an empty unpinned stream + /// that has been prepared for writing with an I/O-sized write buffer. + ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent, + PhjBuilder::Partition* build_partition, + std::unique_ptr<BufferedTupleStream> probe_rows); + ~ProbePartition(); + + /// Prepare to read the probe rows. Allocates the first read block, so reads will + /// not fail with out of memory if this succeeds. Returns an error if the first read + /// block cannot be acquired. "delete_on_read" mode is used, so the blocks backing + /// the buffered tuple stream will be destroyed after reading. + Status PrepareForRead(); + + /// Close the partition and attach resources to 'batch' if non-NULL or free the + /// resources if 'batch' is NULL. Idempotent. void Close(RowBatch* batch); - /// Returns the estimated size of the in memory size for the build side of this - /// partition. This includes the entire build side and the hash table. - int64_t EstimatedInMemSize() const; - - /// Pins the build tuples for this partition and constructs the hash_tbl_ from it. - /// Build rows cannot be added after calling this. - /// If the partition could not be built due to memory pressure, *built is set to false - /// and the caller is responsible for spilling this partition. - Status BuildHashTable(RuntimeState* state, bool* built); + BufferedTupleStream* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); } + PhjBuilder::Partition* build_partition() { return build_partition_; } - /// Spills this partition, cleaning up and unpinning blocks. - /// If 'unpin_all_build' is true, the build stream is completely unpinned, otherwise, - /// it is unpinned with one buffer remaining. - Status Spill(bool unpin_all_build); + inline bool IsClosed() const { return probe_rows_ == NULL; } private: - friend class PartitionedHashJoinNode; - - /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array - /// containing the index of each row's index into the hash table's tuple stream. - /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash - /// table buckets which the rows hashes to will be prefetched. This parameter is - /// replaced with a constant during codegen time. This function may be replaced with - /// a codegen'd version. Returns true if all rows in 'batch' are successfully - /// inserted. - bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx, - RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices); - PartitionedHashJoinNode* parent_; - /// This partition is completely processed and nothing needs to be done for it again. - /// All resources associated with this partition are returned. - bool is_closed_; - - /// True if this partition is spilled. - bool is_spilled_; - - /// How many times rows in this partition have been repartitioned. Partitions created - /// from the node's children's input is level 0, 1 after the first repartitionining, - /// etc. - int level_; + /// The corresponding build partition. Not NULL. Owned by PhjBuilder. + PhjBuilder::Partition* build_partition_; - /// The hash table for this partition. - boost::scoped_ptr<HashTable> hash_tbl_; - - /// Stream of build/probe tuples in this partition. Allocated from the runtime state's - /// object pool. Initially owned by this object (meaning it has to call Close() on it) - /// but transferred to the parent exec node (via the row batch) when the partition - /// is complete. - /// If NULL, ownership has been transfered. - BufferedTupleStream* build_rows_; - BufferedTupleStream* probe_rows_; + /// Stream of probe tuples in this partition. Initially owned by this object but + /// transferred to the parent exec node (via the row batch) when the partition + /// is complete. If NULL, ownership was transferred and the partition is closed. + std::unique_ptr<BufferedTupleStream> probe_rows_; }; /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available /// and is used when the partition level is 0, otherwise xxx_fn_ uses murmur hash and is /// used for subsequent levels. - typedef Status (*ProcessBuildBatchFn)(PartitionedHashJoinNode*, RowBatch*, - bool build_filters); - /// Jitted ProcessBuildBatch function pointers. NULL if codegen is disabled. - ProcessBuildBatchFn process_build_batch_fn_; - ProcessBuildBatchFn process_build_batch_fn_level0_; - typedef int (*ProcessProbeBatchFn)(PartitionedHashJoinNode*, TPrefetchMode::type, RowBatch*, HashTableCtx*, Status*); /// Jitted ProcessProbeBatch function pointers. NULL if codegen is disabled. ProcessProbeBatchFn process_probe_batch_fn_; ProcessProbeBatchFn process_probe_batch_fn_level0_; - typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*, - RowBatch*, const std::vector<BufferedTupleStream::RowIdx>&); - /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled. - InsertBatchFn insert_batch_fn_; - InsertBatchFn insert_batch_fn_level0_; }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node.inline.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.inline.h b/be/src/exec/partitioned-hash-join-node.inline.h index ffeeb45..a53b40e 100644 --- a/be/src/exec/partitioned-hash-join-node.inline.h +++ b/be/src/exec/partitioned-hash-join-node.inline.h @@ -32,12 +32,6 @@ inline void PartitionedHashJoinNode::ResetForProbe() { ht_ctx_->expr_values_cache()->Reset(); } -inline bool PartitionedHashJoinNode::AppendRow(BufferedTupleStream* stream, - TupleRow* row, Status* status) { - if (LIKELY(stream->AddRow(row, status))) return true; - return AppendRowStreamFull(stream, row, status); -} - } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-block-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc index b2c3973..bf7b595 100644 --- a/be/src/runtime/buffered-block-mgr.cc +++ b/be/src/runtime/buffered-block-mgr.cc @@ -15,15 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include "runtime/runtime-state.h" -#include "runtime/mem-tracker.h" -#include "runtime/mem-pool.h" #include "runtime/buffered-block-mgr.h" +#include "runtime/mem-pool.h" +#include "runtime/mem-tracker.h" +#include "runtime/runtime-state.h" #include "runtime/tmp-file-mgr.h" -#include "util/runtime-profile-counters.h" +#include "util/debug-util.h" #include "util/disk-info.h" #include "util/filesystem-util.h" #include "util/impalad-metrics.h" +#include "util/runtime-profile-counters.h" #include "util/uid-util.h" #include <openssl/rand.h> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-tuple-stream-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc index 8d07584..20af23e 100644 --- a/be/src/runtime/buffered-tuple-stream-test.cc +++ b/be/src/runtime/buffered-tuple-stream-test.cc @@ -290,8 +290,13 @@ class SimpleTupleStreamTest : public testing::Test { BufferedTupleStream stream(runtime_state_, *desc, runtime_state_->block_mgr(), client_, use_small_buffers, false); ASSERT_OK(stream.Init(-1, NULL, true)); + bool got_write_buffer; + ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); + ASSERT_TRUE(got_write_buffer); - if (unpin_stream) ASSERT_OK(stream.UnpinStream()); + if (unpin_stream) { + ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); + } // Add rows to the stream int offset = 0; for (int i = 0; i < num_batches; ++i) { @@ -339,10 +344,15 @@ class SimpleTupleStreamTest : public testing::Test { client_, small_buffers == 0, // initial small buffers true); // read_write ASSERT_OK(stream.Init(-1, NULL, true)); + bool got_write_buffer; + ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); + ASSERT_TRUE(got_write_buffer); bool got_read_buffer; ASSERT_OK(stream.PrepareForRead(true, &got_read_buffer)); ASSERT_TRUE(got_read_buffer); - if (unpin_stream) ASSERT_OK(stream.UnpinStream()); + if (unpin_stream) { + ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); + } vector<int> results; @@ -555,6 +565,9 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data) { BufferedTupleStream stream(runtime_state_, *row_desc, runtime_state_->block_mgr(), client_, true, false); ASSERT_OK(stream.Init(-1, NULL, true)); + bool got_write_buffer; + ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); + ASSERT_TRUE(got_write_buffer); int offset = 0; bool full = false; @@ -571,7 +584,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data) { offset += j; } - ASSERT_OK(stream.UnpinStream()); + ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); bool pinned = false; ASSERT_OK(stream.PinStream(false, &pinned)); @@ -624,6 +637,9 @@ TEST_F(SimpleTupleStreamTest, SmallBuffers) { BufferedTupleStream stream(runtime_state_, *int_desc_, runtime_state_->block_mgr(), client_, true, false); ASSERT_OK(stream.Init(-1, NULL, false)); + bool got_write_buffer; + ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); + ASSERT_TRUE(got_write_buffer); // Initial buffer should be small. EXPECT_LT(stream.bytes_in_mem(false), buffer_size); @@ -849,6 +865,9 @@ TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) { BufferedTupleStream stream(runtime_state_, *string_desc_, runtime_state_->block_mgr(), client_, false, false); ASSERT_OK(stream.Init(-1, NULL, false)); + bool got_write_buffer; + ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); + ASSERT_TRUE(got_write_buffer); for (int i = 0; i < num_batches; ++i) { RowBatch* batch = CreateStringBatch(rows_added, 1, false); @@ -1005,6 +1024,10 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) { int num_array_lens = sizeof(array_lens) / sizeof(array_lens[0]); int array_len_index = 0; ASSERT_OK(stream.Init(-1, NULL, false)); + bool got_write_buffer; + ASSERT_OK(stream.PrepareForWrite(&got_write_buffer)); + ASSERT_TRUE(got_write_buffer); + for (int i = 0; i < NUM_ROWS; ++i) { int expected_row_size = tuple_descs[0]->byte_size() + tuple_descs[1]->byte_size(); gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-tuple-stream.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc index 978a221..b2fce45 100644 --- a/be/src/runtime/buffered-tuple-stream.cc +++ b/be/src/runtime/buffered-tuple-stream.cc @@ -120,9 +120,8 @@ BufferedTupleStream::~BufferedTupleStream() { // num_pinned_. int NumPinned(const list<BufferedBlockMgr::Block*>& blocks) { int num_pinned = 0; - for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks.begin(); - it != blocks.end(); ++it) { - if ((*it)->is_pinned() && (*it)->is_max_size()) ++num_pinned; + for (BufferedBlockMgr::Block* block : blocks) { + if (block->is_pinned() && block->is_max_size()) ++num_pinned; } return num_pinned; } @@ -141,10 +140,9 @@ string BufferedTupleStream::DebugString() const { ss << *read_block_; } ss << " blocks=[\n"; - for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks_.begin(); - it != blocks_.end(); ++it) { - ss << "{" << (*it)->DebugString() << "}"; - if (*it != blocks_.back()) ss << ",\n"; + for (BufferedBlockMgr::Block* block : blocks_) { + ss << "{" << block->DebugString() << "}"; + if (block != blocks_.back()) ss << ",\n"; } ss << "]"; return ss.str(); @@ -169,15 +167,15 @@ Status BufferedTupleStream::Init(int node_id, RuntimeProfile* profile, bool pinn if (block_mgr_->max_block_size() < INITIAL_BLOCK_SIZES[0]) { use_small_buffers_ = false; } - - bool got_block; - RETURN_IF_ERROR(NewWriteBlockForRow(fixed_tuple_row_size_, &got_block)); - if (!got_block) return block_mgr_->MemLimitTooLowError(block_mgr_client_, node_id); - DCHECK(write_block_ != NULL); - if (!pinned) RETURN_IF_ERROR(UnpinStream()); + if (!pinned) RETURN_IF_ERROR(UnpinStream(UNPIN_ALL_EXCEPT_CURRENT)); return Status::OK(); } +Status BufferedTupleStream::PrepareForWrite(bool* got_buffer) { + DCHECK(write_block_ == NULL); + return NewWriteBlockForRow(fixed_tuple_row_size_, got_buffer); +} + Status BufferedTupleStream::SwitchToIoBuffers(bool* got_buffer) { if (!use_small_buffers_) { *got_buffer = (write_block_ != NULL); @@ -194,9 +192,8 @@ Status BufferedTupleStream::SwitchToIoBuffers(bool* got_buffer) { } void BufferedTupleStream::Close() { - for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin(); - it != blocks_.end(); ++it) { - (*it)->Delete(); + for (BufferedBlockMgr::Block* block : blocks_) { + block->Delete(); } blocks_.clear(); num_pinned_ = 0; @@ -206,12 +203,11 @@ void BufferedTupleStream::Close() { int64_t BufferedTupleStream::bytes_in_mem(bool ignore_current) const { int64_t result = 0; - for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks_.begin(); - it != blocks_.end(); ++it) { - if (!(*it)->is_pinned()) continue; - if (!(*it)->is_max_size()) continue; - if (*it == write_block_ && ignore_current) continue; - result += (*it)->buffer_len(); + for (BufferedBlockMgr::Block* block : blocks_) { + if (!block->is_pinned()) continue; + if (!block->is_max_size()) continue; + if (block == write_block_ && ignore_current) continue; + result += block->buffer_len(); } return result; } @@ -394,13 +390,12 @@ Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_buffer write_block_ = NULL; } - // Walk the blocks and pin the first non-io sized block. - for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin(); - it != blocks_.end(); ++it) { - if (!(*it)->is_pinned()) { + // Walk the blocks and pin the first IO-sized block. + for (BufferedBlockMgr::Block* block : blocks_) { + if (!block->is_pinned()) { SCOPED_TIMER(pin_timer_); bool current_pinned; - RETURN_IF_ERROR((*it)->Pin(¤t_pinned)); + RETURN_IF_ERROR(block->Pin(¤t_pinned)); if (!current_pinned) { *got_buffer = false; return Status::OK(); @@ -408,7 +403,7 @@ Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_buffer ++num_pinned_; DCHECK_EQ(num_pinned_, NumPinned(blocks_)); } - if ((*it)->is_max_size()) break; + if (block->is_max_size()) break; } read_block_ = blocks_.begin(); @@ -437,12 +432,11 @@ Status BufferedTupleStream::PinStream(bool already_reserved, bool* pinned) { } } - for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin(); - it != blocks_.end(); ++it) { - if ((*it)->is_pinned()) continue; + for (BufferedBlockMgr::Block* block : blocks_) { + if (block->is_pinned()) continue; { SCOPED_TIMER(pin_timer_); - RETURN_IF_ERROR((*it)->Pin(pinned)); + RETURN_IF_ERROR(block->Pin(pinned)); } if (!*pinned) { VLOG_QUERY << "Should have been reserved." << endl @@ -457,9 +451,8 @@ Status BufferedTupleStream::PinStream(bool already_reserved, bool* pinned) { // Populate block_start_idx_ on pin. DCHECK_EQ(block_start_idx_.size(), blocks_.size()); block_start_idx_.clear(); - for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin(); - it != blocks_.end(); ++it) { - block_start_idx_.push_back((*it)->buffer()); + for (BufferedBlockMgr::Block* block : blocks_) { + block_start_idx_.push_back(block->buffer()); } } *pinned = true; @@ -467,18 +460,20 @@ Status BufferedTupleStream::PinStream(bool already_reserved, bool* pinned) { return Status::OK(); } -Status BufferedTupleStream::UnpinStream(bool all) { +Status BufferedTupleStream::UnpinStream(UnpinMode mode) { DCHECK(!closed_); + DCHECK(mode == UNPIN_ALL || mode == UNPIN_ALL_EXCEPT_CURRENT); SCOPED_TIMER(unpin_timer_); for (BufferedBlockMgr::Block* block: blocks_) { if (!block->is_pinned()) continue; - if (!all && (block == write_block_ || (read_write_ && block == *read_block_))) { + if (mode == UNPIN_ALL_EXCEPT_CURRENT + && (block == write_block_ || (read_write_ && block == *read_block_))) { continue; } RETURN_IF_ERROR(UnpinBlock(block)); } - if (all) { + if (mode == UNPIN_ALL) { read_block_ = blocks_.end(); write_block_ = NULL; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-tuple-stream.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h index e14f16c..d3bfa81 100644 --- a/be/src/runtime/buffered-tuple-stream.h +++ b/be/src/runtime/buffered-tuple-stream.h @@ -228,6 +228,13 @@ class BufferedTupleStream { /// 'node_id' is only used for error reporting. Status Init(int node_id, RuntimeProfile* profile, bool pinned); + /// Prepares the stream for writing by attempting to allocate a write block. + /// Called after Init() and before the first AddRow() call. + /// 'got_buffer': set to true if the first write block was successfully pinned, or + /// false if the block could not be pinned and no error was encountered. Undefined + /// if an error status is returned. + Status PrepareForWrite(bool* got_buffer); + /// Must be called for streams using small buffers to switch to IO-sized buffers. /// If it fails to get a buffer (i.e. the switch fails) it resets the use_small_buffers_ /// back to false. @@ -269,9 +276,20 @@ class BufferedTupleStream { /// block_mgr_client_ to pin the stream. Status PinStream(bool already_reserved, bool* pinned); - /// Unpins stream. If all is true, all blocks are unpinned, otherwise all blocks - /// except the write_block_ and read_block_ are unpinned. - Status UnpinStream(bool all = false); + /// Modes for UnpinStream(). + enum UnpinMode { + /// All blocks in the stream are unpinned and the read/write positions in the stream + /// are reset. No more rows can be written to the stream after this. The stream can + /// be re-read from the beginning by calling PrepareForRead(). + UNPIN_ALL, + /// All blocks are unpinned aside from the current read and write blocks (if any), + /// which is left in the same state. The unpinned stream can continue being read + /// or written from the current read or write positions. + UNPIN_ALL_EXCEPT_CURRENT, + }; + + /// Unpins stream with the given 'mode' as described above. + Status UnpinStream(UnpinMode mode); /// Get the next batch of output rows. Memory is still owned by the BufferedTupleStream /// and must be copied out by the caller. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/util/runtime-profile.cc ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc index 58ab501..b19ba33 100644 --- a/be/src/util/runtime-profile.cc +++ b/be/src/util/runtime-profile.cc @@ -374,25 +374,40 @@ void RuntimeProfile::ComputeTimeInProfile(int64_t total) { } void RuntimeProfile::AddChild(RuntimeProfile* child, bool indent, RuntimeProfile* loc) { - DCHECK(child != NULL); lock_guard<SpinLock> l(children_lock_); - if (child_map_.count(child->name_) > 0) { - // This child has already been added, so do nothing. - // Otherwise, the map and vector will be out of sync. - return; - } - child_map_[child->name_] = child; + ChildVector::iterator insert_pos; if (loc == NULL) { - children_.push_back(make_pair(child, indent)); + insert_pos = children_.end(); } else { + bool found = false; for (ChildVector::iterator it = children_.begin(); it != children_.end(); ++it) { if (it->first == loc) { - children_.insert(++it, make_pair(child, indent)); - return; + insert_pos = it + 1; + found = true; + break; } } - DCHECK(false) << "Invalid loc"; + DCHECK(found) << "Invalid loc"; } + AddChildLocked(child, indent, insert_pos); +} + +void RuntimeProfile::AddChildLocked( + RuntimeProfile* child, bool indent, ChildVector::iterator insert_pos) { + children_lock_.DCheckLocked(); + DCHECK(child != NULL); + if (child_map_.count(child->name_) > 0) { + // This child has already been added, so do nothing. + // Otherwise, the map and vector will be out of sync. + return; + } + child_map_[child->name_] = child; + children_.insert(insert_pos, make_pair(child, indent)); +} + +void RuntimeProfile::PrependChild(RuntimeProfile* child, bool indent) { + lock_guard<SpinLock> l(children_lock_); + AddChildLocked(child, indent, children_.begin()); } void RuntimeProfile::GetChildren(vector<RuntimeProfile*>* children) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/util/runtime-profile.h ---------------------------------------------------------------------- diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h index 77220d8..513f39d 100644 --- a/be/src/util/runtime-profile.h +++ b/be/src/util/runtime-profile.h @@ -124,6 +124,10 @@ class RuntimeProfile { void AddChild(RuntimeProfile* child, bool indent = true, RuntimeProfile* location = NULL); + /// Adds a child profile, similarly to AddChild(). The child profile is put before any + /// existing profiles. + void PrependChild(RuntimeProfile* child, bool indent = true); + /// Sorts all children according to a custom comparator. Does not /// invalidate pointers to profiles. template <class Compare> @@ -428,8 +432,13 @@ class RuntimeProfile { /// Create a subtree of runtime profiles from nodes, starting at *node_idx. /// On return, *node_idx is the index one past the end of this subtree - static RuntimeProfile* CreateFromThrift(ObjectPool* pool, - const std::vector<TRuntimeProfileNode>& nodes, int* node_idx); + static RuntimeProfile* CreateFromThrift( + ObjectPool* pool, const std::vector<TRuntimeProfileNode>& nodes, int* node_idx); + + /// Inserts 'child' before the iterator 'insert_pos' in 'children_'. + /// 'children_lock_' must be held by the caller. + void AddChildLocked( + RuntimeProfile* child, bool indent, ChildVector::iterator insert_pos); /// Print the child counters of the given counter name static void PrintChildCounters(const std::string& prefix, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/tests/stress/concurrent_select.py ---------------------------------------------------------------------- diff --git a/tests/stress/concurrent_select.py b/tests/stress/concurrent_select.py index a43b402..1d1d142 100755 --- a/tests/stress/concurrent_select.py +++ b/tests/stress/concurrent_select.py @@ -692,7 +692,7 @@ class Query(object): class QueryRunner(object): """Encapsulates functionality to run a query and provide a runtime report.""" - SPILLED_PATTERN = re.compile("ExecOption:.*Spilled") + SPILLED_PATTERNS= [re.compile("ExecOption:.*Spilled"), re.compile("SpilledRuns: [^0]")] BATCH_SIZE = 1024 def __init__(self): @@ -765,8 +765,8 @@ class QueryRunner(object): # Producing a query profile can be somewhat expensive. A v-tune profile of # impalad showed 10% of cpu time spent generating query profiles. report.profile = cursor.get_profile() - report.mem_was_spilled = \ - QueryRunner.SPILLED_PATTERN.search(report.profile) is not None + report.mem_was_spilled = any([pattern.search(report.profile) is not None + for pattern in QueryRunner.SPILLED_PATTERNS]) except Exception as error: # A mem limit error would have been caught above, no need to check for that here. report.non_mem_limit_error = error
