Repository: incubator-impala
Updated Branches:
  refs/heads/master b7d107a69 -> 241c7e019


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h 
b/be/src/exec/partitioned-hash-join-node.h
index 36dfce6..9827788 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -20,46 +20,88 @@
 #define IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H
 
 #include <boost/scoped_ptr.hpp>
-#include <boost/unordered_set.hpp>
 #include <boost/thread.hpp>
+#include <list>
+#include <memory>
 #include <string>
 
 #include "exec/blocking-join-node.h"
 #include "exec/exec-node.h"
-#include "exec/filter-context.h"
-#include "exec/hash-table.h"
+#include "exec/partitioned-hash-join-builder.h"
 #include "runtime/buffered-block-mgr.h"
 
-#include "gen-cpp/PlanNodes_types.h"  // for TJoinOp
+#include "gen-cpp/Types_types.h"
 
 namespace impala {
 
 class BloomFilter;
 class BufferedBlockMgr;
+class BufferedTupleStream;
 class MemPool;
 class RowBatch;
 class RuntimeFilter;
 class TupleRow;
-class BufferedTupleStream;
 
-/// Operator to perform partitioned hash join, spilling to disk as necessary.
-/// A spilled partition is one that is not fully pinned.
-/// The operator runs in these distinct phases:
-///  1. Consume all build input and partition them. No hash tables are 
maintained.
-///  2. Construct hash tables from as many partitions as possible.
-///  3. Consume all the probe rows. Rows belonging to partitions that are 
spilled
-///     must be spilled as well.
-///  4. Iterate over the spilled partitions, construct the hash table from the 
spilled
-///     build rows and process the spilled probe rows. If the partition is 
still too
-///     big, repeat steps 1-4, using this spilled partitions build and probe 
rows as
-///     input.
-//
+/// Operator to perform partitioned hash join, spilling to disk as necessary. 
This
+/// operator implements multiple join modes with the same code algorithm.
+///
+/// The high-level algorithm is as follows:
+///  1. Consume all build input and partition it. No hash tables are 
maintained.
+///  2. Construct hash tables for as many unspilled partitions as possible.
+///  3. Consume the probe input. Each probe row is hashed to find the 
corresponding build
+///     partition. If the build partition is in-memory (i.e. not spilled), 
then the
+///     partition's hash table is probed and any matching rows can be 
outputted. If the
+///     build partition is spilled, the probe row must also be spilled for 
later
+///     processing.
+///  4. Any spilled partitions are processed. If the build rows and hash table 
for a
+///     spilled partition fit in memory, the spilled partition is brought into 
memory
+///     and its spilled probe rows are processed. Otherwise the spilled 
partition must be
+///     repartitioned into smaller partitions. Repartitioning repeats steps 
1-3 above,
+///     except with the partition's spilled build and probe rows as input.
+///
+/// IMPLEMENTATION DETAILS:
+/// -----------------------
+/// The partitioned hash join algorithm is implemented with the 
PartitionedHashJoinNode
+/// and PhjBuilder classes. Each join node has a builder (see PhjBuilder) that
+/// partitions, stores and builds hash tables over the build rows.
+///
+/// The above algorithm is implemented as a state machine with the following 
phases:
+///
+///   1. [PARTITIONING_BUILD or REPARTITIONING_BUILD] Read build rows from 
child(1) OR
+///      from the spilled build rows of a partition and partition them into 
the builder's
+///      hash partitions. If there is sufficient memory, all build partitions 
are kept
+///      in memory. Otherwise, build partitions are spilled as needed to free 
up memory.
+///      Finally, build a hash table for each in-memory partition and create a 
probe
+///      partition with a write buffer for each spilled partition.
+///
+///      After the phase, the algorithm advances from PARTITIONING_BUILD to
+///      PARTITIONING_PROBE or from REPARTITIONING_BUILD to 
REPARTITIONING_PROBE.
+///
+///   2. [PARTITIONING_PROBE or REPARTITIONING_PROBE] Read the probe rows from 
child(0) or
+///      a the spilled probe rows of a partition and partition them. If a 
probe row's
+///      partition is in memory, probe the partition's hash table, otherwise 
spill the
+///      probe row. Finally, output unmatched build rows for join modes that 
require it.
+///
+///      After the phase, the algorithm terminates if no spilled partitions 
remain or
+///      continues to process one of the remaining spilled partitions by 
advancing to
+///      either PROBING_SPILLED_PARTITION or REPARTITIONING_BUILD, depending 
on whether
+///      the spilled partition's hash table fits in memory or not.
+///
+///   3. [PROBING_SPILLED_PARTITION] Read the probe rows from a spilled 
partition that
+///      was brought back into memory and probe the partition's hash table. 
Finally,
+///      output unmatched build rows for join modes that require it.
+///
+///      After the phase, the algorithm terminates if no spilled partitions 
remain or
+///      continues to process one of the remaining spilled partitions by 
advancing to
+///      either PROBING_SPILLED_PARTITION or REPARTITIONING_BUILD, depending 
on whether
+///      the spilled partition's hash table fits in memory or not.
+///
+/// Null aware anti-join (NAAJ) extends the above algorithm by accumulating 
rows with
+/// NULLs into several different streams, which are processed in a separate 
step to
+/// produce additional output rows. The NAAJ algorithm is documented in more 
detail in
+/// header comments for the null aware functions and data structures.
+///
 /// TODO: don't copy tuple rows so often.
-/// TODO: we need multiple hash functions. Each repartition needs new hash 
functions
-/// or new bits. Multiplicative hashing?
-/// TODO: think about details about multithreading. Multiple partitions in 
parallel?
-/// Multiple threads against a single partition? How to build hash tables in 
parallel?
-/// TODO: BuildHashTables() should start with the partitions that are already 
pinned.
 class PartitionedHashJoinNode : public BlockingJoinNode {
  public:
   PartitionedHashJoinNode(ObjectPool* pool, const TPlanNode& tnode,
@@ -78,102 +120,53 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   virtual Status ProcessBuildInput(RuntimeState* state);
 
  private:
-  class Partition;
-
-  /// Implementation details:
-  /// Logically, the algorithm runs in three modes.
-  ///   1. [PARTITIONING_BUILD or REPARTITIONING] Read the build side rows and 
partition
-  ///      them into hash_partitions_. This is a fixed fan out of the input. 
The input
-  ///      can either come from child(1) OR from the build tuple stream of 
partition
-  ///      that needs to be repartitioned.
-  ///   2. [PROCESSING_PROBE or REPARTITIONING] Read the probe side rows, 
partition them
-  ///      and either perform the join or spill them into hash_partitions_. If 
the
-  ///      partition has the hash table in memory, we perform the join, 
otherwise we
-  ///      spill the probe row. Similar to step one, the rows can come from 
child(0) or
-  ///      a spilled partition.
-  ///   3. [PROBING_SPILLED_PARTITION] Read and construct a single spilled 
partition.
-  ///      In this case we are walking a spilled partition and the hash table 
fits in
-  ///      memory. Neither the build nor probe side need to be partitioned and 
we just
-  ///      perform the join.
-  ///
-  /// States:
-  /// The transition goes from PARTITIONING_BUILD -> PROCESSING_PROBE ->
-  ///    PROBING_SPILLED_PARTITION/REPARTITIONING.
-  /// The last two steps will switch back and forth as many times as we need to
-  /// repartition.
+  class ProbePartition;
+
   enum HashJoinState {
-    /// Partitioning the build (right) child's input. Corresponds to mode 1 
above but
-    /// only when consuming from child(1).
+    /// Partitioning the build (right) child's input into the builder's hash 
partitions.
     PARTITIONING_BUILD,
 
-    /// Processing the probe (left) child's input. Corresponds to mode 2 above 
but
-    /// only when consuming from child(0).
-    PROCESSING_PROBE,
+    /// Processing the probe (left) child's input, probing hash tables and
+    /// spilling probe rows into 'probe_hash_partitions_' if necessary.
+    PARTITIONING_PROBE,
 
-    /// Probing a spilled partition. The hash table for this partition fits in 
memory.
-    /// Corresponds to mode 3.
+    /// Processing the spilled probe rows of a single spilled partition
+    /// ('input_partition_') that fits in memory.
     PROBING_SPILLED_PARTITION,
 
-    /// Repartitioning a single spilled partition (input_partition_) into
-    /// hash_partitions_.
-    /// Corresponds to mode 1 & 2 but reading from a spilled partition.
-    REPARTITIONING,
+    /// Repartitioning the build rows of a single spilled partition 
('input_partition_')
+    /// into the builder's hash partitions.
+    /// Corresponds to PARTITIONING_BUILD but reading from a spilled partition.
+    REPARTITIONING_BUILD,
+
+    /// Probing the repartitioned hash partitions of a single spilled partition
+    /// ('input_partition_') with the probe rows of that partition.
+    /// Corresponds to PARTITIONING_PROBE but reading from a spilled partition.
+    REPARTITIONING_PROBE,
   };
 
-  /// Number of initial partitions to create. Must be a power of two.
-  /// TODO: this is set to a lower than actual value for testing.
-  static const int PARTITION_FANOUT = 16;
-
-  /// Needs to be the log(PARTITION_FANOUT)
-  static const int NUM_PARTITIONING_BITS = 4;
-
-  /// Maximum number of times we will repartition. The maximum build table we
-  /// can process is:
-  /// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). With a (low) 1GB
-  /// limit and 64 fanout, we can support 256TB build tables in the case where
-  /// there is no skew.
-  /// In the case where there is skew, repartitioning is unlikely to help 
(assuming a
-  /// reasonable hash function).
-  /// Note that we need to have at least as many SEED_PRIMES in HashTableCtx.
-  /// TODO: we can revisit and try harder to explicitly detect skew.
-  static const int MAX_PARTITION_DEPTH = 16;
-
-  /// Append the row to stream. In the common case, the row is just in memory 
and the
-  /// append succeeds. If the append fails, we fallback to the slower path of
-  /// AppendRowStreamFull().
-  /// Returns true if the row was added and false otherwise. If false is 
returned,
-  /// *status contains the error (doesn't return status because this is very 
perf
-  /// sensitive).
-  bool AppendRow(BufferedTupleStream* stream, TupleRow* row, Status* status);
-
-  /// Slow path for AppendRow() above. It is called when the stream has failed 
to append
-  /// the row. We need to find more memory by either switching to IO-buffers, 
in case the
-  /// stream still uses small buffers, or spilling a partition.
-  bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row, Status* 
status);
-
-  /// Called when we need to free up memory by spilling a partition.
-  /// This function walks hash_partitions_ and picks one to spill.
-  /// *spilled_partition is the partition that was spilled.
-  /// Returns non-ok status if we couldn't spill a partition.
-  Status SpillPartition(Partition** spilled_partition);
-
-  /// Partitions the entire build input (either from child(1) or 
input_partition_) into
-  /// hash_partitions_. When this call returns, hash_partitions_ is ready to 
consume
-  /// the probe input.
-  /// 'level' is the level new partitions (in hash_partitions_) should be 
created with.
-  Status ProcessBuildInput(RuntimeState* state, int level);
-
-  /// Reads the rows in build_batch and partitions them in hash_partitions_. If
-  /// 'build_filters' is true, runtime filters are populated.
-  Status ProcessBuildBatch(RowBatch* build_batch, bool build_filters);
-
-  /// Call at the end of partitioning the build rows (which could be from the 
build child
-  /// or from repartitioning an existing partition). After this function 
returns, all
-  /// partitions in hash_partitions_ are ready to accept probe rows. This 
function
-  /// constructs hash tables for as many partitions as fit in memory (which 
can be none).
-  /// For the remaining partitions, this function initializes the probe 
spilling
-  /// structures.
-  Status BuildHashTables(RuntimeState* state);
+  /// Constants from PhjBuilder, added to this node for convenience.
+  static const int PARTITION_FANOUT = PhjBuilder::PARTITION_FANOUT;
+  static const int NUM_PARTITIONING_BITS = PhjBuilder::NUM_PARTITIONING_BITS;
+  static const int MAX_PARTITION_DEPTH = PhjBuilder::MAX_PARTITION_DEPTH;
+
+  /// Initialize 'probe_hash_partitions_' and 'hash_tbls_' before probing. One 
probe
+  /// partition is created per spilled build partition, and 'hash_tbls_' is 
initialized
+  /// with pointers to the hash tables of in-memory partitions and NULL 
pointers for
+  /// spilled or closed partitions.
+  /// Called after the builder has partitioned the build rows and built hash 
tables,
+  /// either in the initial build step, or after repartitioning a spilled 
partition.
+  /// After this function returns, all partitions are ready to process probe 
rows.
+  Status PrepareForProbe();
+
+  /// Creates an initialized probe partition at 'partition_idx' in
+  /// 'probe_hash_partitions_'.
+  void CreateProbePartition(
+      int partition_idx, std::unique_ptr<BufferedTupleStream> probe_rows);
+
+  /// Append the probe row 'row' to 'stream'. The stream must be unpinned and 
must have
+  /// a write buffer allocated, so this will succeed unless an error is 
encountered.
+  Status AppendProbeRow(BufferedTupleStream* stream, TupleRow* row);
 
   /// Probes the hash table for rows matching the current probe row and appends
   /// all the matching build rows (with probe row) to output batch. Returns 
true
@@ -292,6 +285,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// new partition at the front of output_build_partitions_.
   void OutputUnmatchedBuild(RowBatch* out_batch);
 
+  /// Initializes 'null_aware_probe_partition_' and prepares its probe stream 
for writing.
+  Status InitNullAwareProbePartition();
+
+  /// Initializes 'null_probe_rows_' and prepares that stream for writing.
+  Status InitNullProbeRows();
+
   /// Initializes null_aware_partition_ and nulls_build_batch_ to output rows.
   Status PrepareNullAwarePartition();
 
@@ -313,155 +312,75 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// false. Used for NAAJ.
   Status OutputNullAwareNullProbe(RuntimeState* state, RowBatch* out_batch);
 
-  /// Call at the end of consuming the probe rows. Walks hash_partitions_ and
-  ///  - If this partition had a hash table, close it. This partition is fully 
processed
-  ///    on both the build and probe sides. The streams are transferred to 
batch.
+  /// Call at the end of consuming the probe rows. Cleans up the build and 
probe hash
+  /// partitions and:
+  ///  - If the build partition had a hash table, close it. The build and probe
+  ///    partitions are fully processed. The streams are transferred to 
'batch'.
   ///    In the case of right-outer and full-outer joins, instead of closing 
this
   ///    partition we put it on a list of partitions for which we need to 
flush their
   ///    unmatched rows.
-  ///  - If this partition did not have a hash table, meaning both sides were 
spilled,
-  ///    move the partition to spilled_partitions_.
+  ///  - If the build partition did not have a hash table, meaning both build 
and probe
+  ///    rows were spilled, move the partition to 'spilled_partitions_'.
   Status CleanUpHashPartitions(RowBatch* batch);
 
   /// Get the next row batch from the probe (left) side (child(0)). If we are 
done
-  /// consuming the input, sets probe_batch_pos_ to -1, otherwise, sets it to 
0.
-  Status NextProbeRowBatch(RuntimeState*, RowBatch* out_batch);
+  /// consuming the input, sets 'probe_batch_pos_' to -1, otherwise, sets it 
to 0.
+  Status NextProbeRowBatch(RuntimeState* state, RowBatch* out_batch);
 
-  /// Get the next probe row batch from input_partition_. If we are done 
consuming the
-  /// input, sets probe_batch_pos_ to -1, otherwise, sets it to 0.
-  Status NextSpilledProbeRowBatch(RuntimeState*, RowBatch* out_batch);
+  /// Get the next probe row batch from 'input_partition_'. If we are done 
consuming the
+  /// input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0.
+  Status NextSpilledProbeRowBatch(RuntimeState* state, RowBatch* out_batch);
 
-  /// Moves onto the next spilled partition and initializes input_partition_. 
This
-  /// function processes the entire build side of input_partition_ and when 
this function
-  /// returns, we are ready to consume the probe side of input_partition_.
+  /// Moves onto the next spilled partition and initializes 
'input_partition_'. This
+  /// function processes the entire build side of 'input_partition_' and when 
this
+  /// function returns, we are ready to consume the probe side of 
'input_partition_'.
   /// If the build side's hash table fits in memory, we will construct 
input_partition_'s
   /// hash table. If it does not, meaning we need to repartition, this 
function will
-  /// initialize hash_partitions_.
-  Status PrepareNextPartition(RuntimeState*);
-
-  /// Iterates over all the partitions in hash_partitions_ and returns the 
number of rows
-  /// of the largest partition (in terms of number of build and probe rows).
-  int64_t LargestSpilledPartition() const;
+  /// repartition the build rows into 'builder->hash_partitions_' and prepare 
for
+  /// repartitioning the partition's probe rows.
+  Status PrepareSpilledPartitionForProbe(RuntimeState* state, bool* 
got_partition);
 
-  /// Calls Close() on every Partition in 'hash_partitions_',
-  /// 'spilled_partitions_', and 'output_build_partitions_' and then resets 
the lists,
-  /// the vector and the partition pool.
-  void ClosePartitions();
+  /// Calls Close() on every probe partition, destroys the partitions and 
cleans up any
+  /// references to the partitions. Also closes and destroys 
'null_probe_rows_'.
+  void CloseAndDeletePartitions();
 
   /// Prepares for probing the next batch.
   void ResetForProbe();
 
-  /// For each filter in filters_, allocate a bloom_filter from the 
fragment-local
-  /// RuntimeFilterBank and store it in runtime_filters_ to populate during 
the build
-  /// phase.
-  void AllocateRuntimeFilters(RuntimeState* state);
-
-  /// Publish the runtime filters to the fragment-local
-  /// RuntimeFilterBank. 'total_build_rows' is used to determine whether the 
computed
-  /// filters have an unacceptably high false-positive rate.
-  void PublishRuntimeFilters(RuntimeState* state, int64_t total_build_rows);
-
   /// Codegen function to create output row. Assumes that the probe row is 
non-NULL.
   Status CodegenCreateOutputRow(LlvmCodeGen* codegen, llvm::Function** fn);
 
-  /// Codegen processing build batches.  Identical signature to 
ProcessBuildBatch.
-  /// Returns non-OK status if codegen was not possible.
-  Status CodegenProcessBuildBatch(RuntimeState* state, llvm::Function* hash_fn,
-      llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn);
-
   /// Codegen processing probe batches.  Identical signature to 
ProcessProbeBatch.
   /// Returns non-OK if codegen was not possible.
-  Status CodegenProcessProbeBatch(
-      RuntimeState* state, llvm::Function* hash_fn, llvm::Function* 
murmur_hash_fn);
-
-  /// Codegen inserting batches into a partition's hash table. Identical 
signature to
-  /// Partition::InsertBatch(). Returns non-OK if codegen was not possible.
-  Status CodegenInsertBatch(RuntimeState* state, llvm::Function* hash_fn,
-      llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn);
+  Status CodegenProcessProbeBatch(RuntimeState* state);
 
   /// Returns the current state of the partition as a string.
   std::string PrintState() const;
 
-  /// Updates state_ to 's', logging the transition.
-  void UpdateState(HashJoinState s);
+  /// Updates 'state_' to 'next_state', logging the transition.
+  void UpdateState(HashJoinState next_state);
 
   std::string NodeDebugString() const;
 
-  /// We need two output buffers per partition (one for build and one for 
probe) and
-  /// and two additional buffers for the input (while repartitioning; for the 
build and
-  /// probe sides).
-  /// For NAAJ, we need 3 additional buffers to maintain the 
null_aware_partition_.
-  int MinRequiredBuffers() const {
-    int num_reserved_buffers = PARTITION_FANOUT * 2 + 2;
-    num_reserved_buffers += join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ? 3 
: 0;
-    return num_reserved_buffers;
-  }
-
   RuntimeState* runtime_state_;
 
   /// Our equi-join predicates "<lhs> = <rhs>" are separated into
   /// build_expr_ctxs_ (over child(1)) and probe_expr_ctxs_ (over child(0))
-  std::vector<ExprContext*> probe_expr_ctxs_;
   std::vector<ExprContext*> build_expr_ctxs_;
-
-  /// List of filters to build during build phase.
-  std::vector<FilterContext> filters_;
-
-  /// is_not_distinct_from_[i] is true if and only if the ith equi-join 
predicate is IS
-  /// NOT DISTINCT FROM, rather than equality.
-  std::vector<bool> is_not_distinct_from_;
+  std::vector<ExprContext*> probe_expr_ctxs_;
 
   /// Non-equi-join conjuncts from the ON clause.
   std::vector<ExprContext*> other_join_conjunct_ctxs_;
 
-  /// Codegen doesn't allow for automatic Status variables because then 
exception
-  /// handling code is needed to destruct the Status, and our function call 
substitution
-  /// doesn't know how to deal with the LLVM IR 'invoke' instruction. 
Workaround that by
-  /// placing the build-side status here so exceptions won't need to destruct 
it.
-  /// This status should used directly only by ProcesssBuildBatch().
-  /// TODO: fix IMPALA-1948 and remove this.
-  Status build_status_;
-
-  /// Client to the buffered block mgr.
-  BufferedBlockMgr::Client* block_mgr_client_;
-
   /// Used for hash-related functionality, such as evaluating rows and 
calculating hashes.
-  /// TODO: If we want to multi-thread then this context should be 
thread-local and not
-  /// associated with the node.
   boost::scoped_ptr<HashTableCtx> ht_ctx_;
 
   /// The iterator that corresponds to the look up of current_probe_row_.
   HashTable::Iterator hash_tbl_iterator_;
 
-  /// Total time spent partitioning build.
-  RuntimeProfile::Counter* partition_build_timer_;
-
-  /// Total number of hash buckets across all partitions.
-  RuntimeProfile::Counter* num_hash_buckets_;
-
-  /// Total number of partitions created.
-  RuntimeProfile::Counter* partitions_created_;
-
-  /// Level of max partition (i.e. number of repartitioning steps).
-  RuntimeProfile::HighWaterMarkCounter* max_partition_level_;
-
-  /// Number of build/probe rows that have been partitioned.
-  RuntimeProfile::Counter* num_build_rows_partitioned_;
+  /// Number of probe rows that have been partitioned.
   RuntimeProfile::Counter* num_probe_rows_partitioned_;
 
-  /// Number of partitions that have been repartitioned.
-  RuntimeProfile::Counter* num_repartitions_;
-
-  /// Number of partitions that have been spilled.
-  RuntimeProfile::Counter* num_spilled_partitions_;
-
-  /// The largest fraction (of build side) after repartitioning. This is 
expected to be
-  /// 1 / PARTITION_FANOUT. A value much larger indicates skew.
-  RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_;
-
-  /// Number of hash collisions - unequal rows that have identical hash values
-  RuntimeProfile::Counter* num_hash_collisions_;
-
   /// Time spent evaluating other_join_conjuncts for NAAJ.
   RuntimeProfile::Counter* null_aware_eval_timer_;
 
@@ -471,60 +390,52 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// State of the partitioned hash join algorithm. Used just for debugging.
   HashJoinState state_;
 
-  /// Object pool that holds the Partition objects in hash_partitions_.
-  boost::scoped_ptr<ObjectPool> partition_pool_;
-
-  /// The current set of partitions that are being built. This is only used in
-  /// mode 1 and 2 when we need to partition the build and probe inputs.
-  /// This is not used when processing a single partition.
-  /// After CleanUpHashPartitions() is called this vector should be empty.
-  std::vector<Partition*> hash_partitions_;
-
-  /// The list of partitions that have been spilled on both sides and still 
need more
-  /// processing. These partitions could need repartitioning, in which case 
more
-  /// partitions will be added to this list after repartitioning.
-  /// This list is populated at CleanUpHashPartitions().
-  std::list<Partition*> spilled_partitions_;
+  /// The build-side of the join. Initialized in Init().
+  boost::scoped_ptr<PhjBuilder> builder_;
 
   /// Cache of the per partition hash table to speed up ProcessProbeBatch.
   /// In the case where we need to partition the probe:
-  ///  hash_tbls_[i] = hash_partitions_[i]->hash_tbl();
+  ///  hash_tbls_[i] = builder_->hash_partitions_[i]->hash_tbl();
   /// In the case where we don't need to partition the probe:
   ///  hash_tbls_[i] = input_partition_->hash_tbl();
   HashTable* hash_tbls_[PARTITION_FANOUT];
 
-  /// The current input partition to be processed (not in spilled_partitions_).
-  /// This partition can either serve as the source for a repartitioning step, 
or
-  /// if the hash table fits in memory, the source of the probe rows.
-  Partition* input_partition_;
+  /// Probe partitions, with indices corresponding to the build partitions in
+  /// builder_->hash_partitions(). This is non-empty only in the 
PARTITIONING_PROBE or
+  /// REPARTITIONING_PROBE states, in which case it has NULL entries for 
in-memory
+  /// build partitions and non-NULL entries for spilled build partitions (so 
that we
+  /// have somewhere to spill the probe rows for the spilled partition).
+  std::vector<std::unique_ptr<ProbePartition>> probe_hash_partitions_;
+
+  /// The list of probe partitions that have been spilled and still need more
+  /// processing. These partitions could need repartitioning, in which case 
more
+  /// partitions will be added to this list after repartitioning.
+  /// This list is populated at CleanUpHashPartitions().
+  std::list<std::unique_ptr<ProbePartition>> spilled_partitions_;
+
+  /// The current spilled probe partition being processed as input to 
repartitioning,
+  /// or the source of the probe rows if the hash table fits in memory.
+  std::unique_ptr<ProbePartition> input_partition_;
 
   /// In the case of right-outer and full-outer joins, this is the list of the 
partitions
   /// for which we need to output their unmatched build rows.
   /// This list is populated at CleanUpHashPartitions().
-  std::list<Partition*> output_build_partitions_;
-
-  /// Partition used if null_aware_ is set. This partition is always processed 
at the end
-  /// after all build and probe rows are processed. Rows are added to this 
partition along
-  /// the way.
-  /// In this partition's build_rows_, we store all the rows for which 
build_expr_ctxs_
-  /// evaluated over the row returns NULL (i.e. it has a NULL on the eq join 
slot).
-  /// In this partition's probe_rows, we store all probe rows that did not 
have a match
-  /// in the hash table.
-  /// At the very end, we then iterate over all the probe rows. For each probe 
row, we
-  /// return the rows that did not match any of the build rows.
-  /// NULL if we this join is not null aware or we are done processing this 
partition.
-  Partition* null_aware_partition_;
+  std::list<PhjBuilder::Partition*> output_build_partitions_;
 
   /// Used while processing null_aware_partition_. It contains all the build 
tuple rows
   /// with a NULL when evaluating the hash table expr.
   boost::scoped_ptr<RowBatch> nulls_build_batch_;
 
-  /// If true, the build side has at least one row.
-  bool non_empty_build_;
+  /// Partition used if 'null_aware_' is set. During probing, rows from the 
probe
+  /// side that did not have a match in the hash table are appended to this 
partition.
+  /// At the very end, we then iterate over the partition's probe rows. For 
each probe
+  /// row, we return the rows that did not match any of the partition's build 
rows. This
+  /// is NULL if this join is not null aware or we are done processing this 
partition.
+  boost::scoped_ptr<ProbePartition> null_aware_probe_partition_;
 
   /// For NAAJ, this stream contains all probe rows that had NULL on the hash 
table
-  /// conjuncts.
-  BufferedTupleStream* null_probe_rows_;
+  /// conjuncts. Must be unique_ptr so we can release it and transfer to 
output batches.
+  std::unique_ptr<BufferedTupleStream> null_probe_rows_;
 
   /// For each row in null_probe_rows_, true if this row has matched any build 
row
   /// (i.e. the resulting joined row passes other_join_conjuncts).
@@ -538,99 +449,55 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// END: Members that must be Reset()
   /////////////////////////////////////////
 
-  class Partition {
+  /// The probe-side partition corresponding to a build partition. The probe 
partition
+  /// is created when a build partition is spilled so that probe rows can be 
spilled to
+  /// disk for later processing.
+  class ProbePartition {
    public:
-    Partition(RuntimeState* state, PartitionedHashJoinNode* parent, int level);
-    ~Partition();
-
-    BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_; }
-    BufferedTupleStream* ALWAYS_INLINE probe_rows() { return probe_rows_; }
-    HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); }
-
-    bool ALWAYS_INLINE is_closed() const { return is_closed_; }
-    bool ALWAYS_INLINE is_spilled() const { return is_spilled_; }
-
-    /// Must be called once per partition to release any resources. This 
should be called
-    /// as soon as possible to release memory.
-    /// If batch is non-null, the build and probe streams are attached to the 
batch,
-    /// transferring ownership to them.
+    /// Create a new probe partition. 'probe_rows' should be an empty unpinned 
stream
+    /// that has been prepared for writing with an I/O-sized write buffer.
+    ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent,
+        PhjBuilder::Partition* build_partition,
+        std::unique_ptr<BufferedTupleStream> probe_rows);
+    ~ProbePartition();
+
+    /// Prepare to read the probe rows. Allocates the first read block, so 
reads will
+    /// not fail with out of memory if this succeeds. Returns an error if the 
first read
+    /// block cannot be acquired. "delete_on_read" mode is used, so the blocks 
backing
+    /// the buffered tuple stream will be destroyed after reading.
+    Status PrepareForRead();
+
+    /// Close the partition and attach resources to 'batch' if non-NULL or 
free the
+    /// resources if 'batch' is NULL. Idempotent.
     void Close(RowBatch* batch);
 
-    /// Returns the estimated size of the in memory size for the build side of 
this
-    /// partition. This includes the entire build side and the hash table.
-    int64_t EstimatedInMemSize() const;
-
-    /// Pins the build tuples for this partition and constructs the hash_tbl_ 
from it.
-    /// Build rows cannot be added after calling this.
-    /// If the partition could not be built due to memory pressure, *built is 
set to false
-    /// and the caller is responsible for spilling this partition.
-    Status BuildHashTable(RuntimeState* state, bool* built);
+    BufferedTupleStream* ALWAYS_INLINE probe_rows() { return 
probe_rows_.get(); }
+    PhjBuilder::Partition* build_partition() { return build_partition_; }
 
-    /// Spills this partition, cleaning up and unpinning blocks.
-    /// If 'unpin_all_build' is true, the build stream is completely unpinned, 
otherwise,
-    /// it is unpinned with one buffer remaining.
-    Status Spill(bool unpin_all_build);
+    inline bool IsClosed() const { return probe_rows_ == NULL; }
 
    private:
-    friend class PartitionedHashJoinNode;
-
-    /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is 
an array
-    /// containing the index of each row's index into the hash table's tuple 
stream.
-    /// 'prefetch_mode' is the prefetching mode in use. If it's not 
PREFETCH_NONE, hash
-    /// table buckets which the rows hashes to will be prefetched. This 
parameter is
-    /// replaced with a constant during codegen time. This function may be 
replaced with
-    /// a codegen'd version. Returns true if all rows in 'batch' are 
successfully
-    /// inserted.
-    bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
-        RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& 
indices);
-
     PartitionedHashJoinNode* parent_;
 
-    /// This partition is completely processed and nothing needs to be done 
for it again.
-    /// All resources associated with this partition are returned.
-    bool is_closed_;
-
-    /// True if this partition is spilled.
-    bool is_spilled_;
-
-    /// How many times rows in this partition have been repartitioned. 
Partitions created
-    /// from the node's children's input is level 0, 1 after the first 
repartitionining,
-    /// etc.
-    int level_;
+    /// The corresponding build partition. Not NULL. Owned by PhjBuilder.
+    PhjBuilder::Partition* build_partition_;
 
-    /// The hash table for this partition.
-    boost::scoped_ptr<HashTable> hash_tbl_;
-
-    /// Stream of build/probe tuples in this partition. Allocated from the 
runtime state's
-    /// object pool. Initially owned by this object (meaning it has to call 
Close() on it)
-    /// but transferred to the parent exec node (via the row batch) when the 
partition
-    /// is complete.
-    /// If NULL, ownership has been transfered.
-    BufferedTupleStream* build_rows_;
-    BufferedTupleStream* probe_rows_;
+    /// Stream of probe tuples in this partition. Initially owned by this 
object but
+    /// transferred to the parent exec node (via the row batch) when the 
partition
+    /// is complete. If NULL, ownership was transferred and the partition is 
closed.
+    std::unique_ptr<BufferedTupleStream> probe_rows_;
   };
 
   /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when 
available
   /// and is used when the partition level is 0, otherwise xxx_fn_ uses murmur 
hash and is
   /// used for subsequent levels.
 
-  typedef Status (*ProcessBuildBatchFn)(PartitionedHashJoinNode*, RowBatch*,
-      bool build_filters);
-  /// Jitted ProcessBuildBatch function pointers.  NULL if codegen is disabled.
-  ProcessBuildBatchFn process_build_batch_fn_;
-  ProcessBuildBatchFn process_build_batch_fn_level0_;
-
   typedef int (*ProcessProbeBatchFn)(PartitionedHashJoinNode*,
       TPrefetchMode::type, RowBatch*, HashTableCtx*, Status*);
   /// Jitted ProcessProbeBatch function pointers.  NULL if codegen is disabled.
   ProcessProbeBatchFn process_probe_batch_fn_;
   ProcessProbeBatchFn process_probe_batch_fn_level0_;
 
-  typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*,
-      RowBatch*, const std::vector<BufferedTupleStream::RowIdx>&);
-  /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is 
disabled.
-  InsertBatchFn insert_batch_fn_;
-  InsertBatchFn insert_batch_fn_level0_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.inline.h 
b/be/src/exec/partitioned-hash-join-node.inline.h
index ffeeb45..a53b40e 100644
--- a/be/src/exec/partitioned-hash-join-node.inline.h
+++ b/be/src/exec/partitioned-hash-join-node.inline.h
@@ -32,12 +32,6 @@ inline void PartitionedHashJoinNode::ResetForProbe() {
   ht_ctx_->expr_values_cache()->Reset();
 }
 
-inline bool PartitionedHashJoinNode::AppendRow(BufferedTupleStream* stream,
-    TupleRow* row, Status* status) {
-  if (LIKELY(stream->AddRow(row, status))) return true;
-  return AppendRowStreamFull(stream, row, status);
-}
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc 
b/be/src/runtime/buffered-block-mgr.cc
index b2c3973..bf7b595 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -15,15 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/runtime-state.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/mem-pool.h"
 #include "runtime/buffered-block-mgr.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/runtime-state.h"
 #include "runtime/tmp-file-mgr.h"
-#include "util/runtime-profile-counters.h"
+#include "util/debug-util.h"
 #include "util/disk-info.h"
 #include "util/filesystem-util.h"
 #include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
 #include "util/uid-util.h"
 
 #include <openssl/rand.h>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc 
b/be/src/runtime/buffered-tuple-stream-test.cc
index 8d07584..20af23e 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -290,8 +290,13 @@ class SimpleTupleStreamTest : public testing::Test {
     BufferedTupleStream stream(runtime_state_, *desc, 
runtime_state_->block_mgr(),
         client_, use_small_buffers, false);
     ASSERT_OK(stream.Init(-1, NULL, true));
+    bool got_write_buffer;
+    ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+    ASSERT_TRUE(got_write_buffer);
 
-    if (unpin_stream) ASSERT_OK(stream.UnpinStream());
+    if (unpin_stream) {
+      
ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+    }
     // Add rows to the stream
     int offset = 0;
     for (int i = 0; i < num_batches; ++i) {
@@ -339,10 +344,15 @@ class SimpleTupleStreamTest : public testing::Test {
           client_, small_buffers == 0,  // initial small buffers
           true); // read_write
       ASSERT_OK(stream.Init(-1, NULL, true));
+      bool got_write_buffer;
+      ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+      ASSERT_TRUE(got_write_buffer);
       bool got_read_buffer;
       ASSERT_OK(stream.PrepareForRead(true, &got_read_buffer));
       ASSERT_TRUE(got_read_buffer);
-      if (unpin_stream) ASSERT_OK(stream.UnpinStream());
+      if (unpin_stream) {
+        
ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+      }
 
       vector<int> results;
 
@@ -555,6 +565,9 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data) {
   BufferedTupleStream stream(runtime_state_, *row_desc, 
runtime_state_->block_mgr(),
       client_, true, false);
   ASSERT_OK(stream.Init(-1, NULL, true));
+  bool got_write_buffer;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+  ASSERT_TRUE(got_write_buffer);
 
   int offset = 0;
   bool full = false;
@@ -571,7 +584,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data) {
     offset += j;
   }
 
-  ASSERT_OK(stream.UnpinStream());
+  ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
 
   bool pinned = false;
   ASSERT_OK(stream.PinStream(false, &pinned));
@@ -624,6 +637,9 @@ TEST_F(SimpleTupleStreamTest, SmallBuffers) {
   BufferedTupleStream stream(runtime_state_, *int_desc_, 
runtime_state_->block_mgr(),
       client_, true, false);
   ASSERT_OK(stream.Init(-1, NULL, false));
+  bool got_write_buffer;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+  ASSERT_TRUE(got_write_buffer);
 
   // Initial buffer should be small.
   EXPECT_LT(stream.bytes_in_mem(false), buffer_size);
@@ -849,6 +865,9 @@ TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) {
   BufferedTupleStream stream(runtime_state_, *string_desc_, 
runtime_state_->block_mgr(),
       client_, false, false);
   ASSERT_OK(stream.Init(-1, NULL, false));
+  bool got_write_buffer;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+  ASSERT_TRUE(got_write_buffer);
 
   for (int i = 0; i < num_batches; ++i) {
     RowBatch* batch = CreateStringBatch(rows_added, 1, false);
@@ -1005,6 +1024,10 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
   int num_array_lens = sizeof(array_lens) / sizeof(array_lens[0]);
   int array_len_index = 0;
   ASSERT_OK(stream.Init(-1, NULL, false));
+  bool got_write_buffer;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+  ASSERT_TRUE(got_write_buffer);
+
   for (int i = 0; i < NUM_ROWS; ++i) {
     int expected_row_size = tuple_descs[0]->byte_size() + 
tuple_descs[1]->byte_size();
     gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index 978a221..b2fce45 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -120,9 +120,8 @@ BufferedTupleStream::~BufferedTupleStream() {
 // num_pinned_.
 int NumPinned(const list<BufferedBlockMgr::Block*>& blocks) {
   int num_pinned = 0;
-  for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks.begin();
-      it != blocks.end(); ++it) {
-    if ((*it)->is_pinned() && (*it)->is_max_size()) ++num_pinned;
+  for (BufferedBlockMgr::Block* block : blocks) {
+    if (block->is_pinned() && block->is_max_size()) ++num_pinned;
   }
   return num_pinned;
 }
@@ -141,10 +140,9 @@ string BufferedTupleStream::DebugString() const {
     ss << *read_block_;
   }
   ss << " blocks=[\n";
-  for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks_.begin();
-      it != blocks_.end(); ++it) {
-    ss << "{" << (*it)->DebugString() << "}";
-    if (*it != blocks_.back()) ss << ",\n";
+  for (BufferedBlockMgr::Block* block : blocks_) {
+    ss << "{" << block->DebugString() << "}";
+    if (block != blocks_.back()) ss << ",\n";
   }
   ss << "]";
   return ss.str();
@@ -169,15 +167,15 @@ Status BufferedTupleStream::Init(int node_id, 
RuntimeProfile* profile, bool pinn
   if (block_mgr_->max_block_size() < INITIAL_BLOCK_SIZES[0]) {
     use_small_buffers_ = false;
   }
-
-  bool got_block;
-  RETURN_IF_ERROR(NewWriteBlockForRow(fixed_tuple_row_size_, &got_block));
-  if (!got_block) return block_mgr_->MemLimitTooLowError(block_mgr_client_, 
node_id);
-  DCHECK(write_block_ != NULL);
-  if (!pinned) RETURN_IF_ERROR(UnpinStream());
+  if (!pinned) RETURN_IF_ERROR(UnpinStream(UNPIN_ALL_EXCEPT_CURRENT));
   return Status::OK();
 }
 
+Status BufferedTupleStream::PrepareForWrite(bool* got_buffer) {
+  DCHECK(write_block_ == NULL);
+  return NewWriteBlockForRow(fixed_tuple_row_size_, got_buffer);
+}
+
 Status BufferedTupleStream::SwitchToIoBuffers(bool* got_buffer) {
   if (!use_small_buffers_) {
     *got_buffer = (write_block_ != NULL);
@@ -194,9 +192,8 @@ Status BufferedTupleStream::SwitchToIoBuffers(bool* 
got_buffer) {
 }
 
 void BufferedTupleStream::Close() {
-  for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
-      it != blocks_.end(); ++it) {
-    (*it)->Delete();
+  for (BufferedBlockMgr::Block* block : blocks_) {
+    block->Delete();
   }
   blocks_.clear();
   num_pinned_ = 0;
@@ -206,12 +203,11 @@ void BufferedTupleStream::Close() {
 
 int64_t BufferedTupleStream::bytes_in_mem(bool ignore_current) const {
   int64_t result = 0;
-  for (list<BufferedBlockMgr::Block*>::const_iterator it = blocks_.begin();
-      it != blocks_.end(); ++it) {
-    if (!(*it)->is_pinned()) continue;
-    if (!(*it)->is_max_size()) continue;
-    if (*it == write_block_ && ignore_current) continue;
-    result += (*it)->buffer_len();
+  for (BufferedBlockMgr::Block* block : blocks_) {
+    if (!block->is_pinned()) continue;
+    if (!block->is_max_size()) continue;
+    if (block == write_block_ && ignore_current) continue;
+    result += block->buffer_len();
   }
   return result;
 }
@@ -394,13 +390,12 @@ Status BufferedTupleStream::PrepareForRead(bool 
delete_on_read, bool* got_buffer
     write_block_ = NULL;
   }
 
-  // Walk the blocks and pin the first non-io sized block.
-  for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
-      it != blocks_.end(); ++it) {
-    if (!(*it)->is_pinned()) {
+  // Walk the blocks and pin the first IO-sized block.
+  for (BufferedBlockMgr::Block* block : blocks_) {
+    if (!block->is_pinned()) {
       SCOPED_TIMER(pin_timer_);
       bool current_pinned;
-      RETURN_IF_ERROR((*it)->Pin(&current_pinned));
+      RETURN_IF_ERROR(block->Pin(&current_pinned));
       if (!current_pinned) {
         *got_buffer = false;
         return Status::OK();
@@ -408,7 +403,7 @@ Status BufferedTupleStream::PrepareForRead(bool 
delete_on_read, bool* got_buffer
       ++num_pinned_;
       DCHECK_EQ(num_pinned_, NumPinned(blocks_));
     }
-    if ((*it)->is_max_size()) break;
+    if (block->is_max_size()) break;
   }
 
   read_block_ = blocks_.begin();
@@ -437,12 +432,11 @@ Status BufferedTupleStream::PinStream(bool 
already_reserved, bool* pinned) {
     }
   }
 
-  for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
-      it != blocks_.end(); ++it) {
-    if ((*it)->is_pinned()) continue;
+  for (BufferedBlockMgr::Block* block : blocks_) {
+    if (block->is_pinned()) continue;
     {
       SCOPED_TIMER(pin_timer_);
-      RETURN_IF_ERROR((*it)->Pin(pinned));
+      RETURN_IF_ERROR(block->Pin(pinned));
     }
     if (!*pinned) {
       VLOG_QUERY << "Should have been reserved." << endl
@@ -457,9 +451,8 @@ Status BufferedTupleStream::PinStream(bool 
already_reserved, bool* pinned) {
     // Populate block_start_idx_ on pin.
     DCHECK_EQ(block_start_idx_.size(), blocks_.size());
     block_start_idx_.clear();
-    for (list<BufferedBlockMgr::Block*>::iterator it = blocks_.begin();
-        it != blocks_.end(); ++it) {
-      block_start_idx_.push_back((*it)->buffer());
+    for (BufferedBlockMgr::Block* block : blocks_) {
+      block_start_idx_.push_back(block->buffer());
     }
   }
   *pinned = true;
@@ -467,18 +460,20 @@ Status BufferedTupleStream::PinStream(bool 
already_reserved, bool* pinned) {
   return Status::OK();
 }
 
-Status BufferedTupleStream::UnpinStream(bool all) {
+Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
   DCHECK(!closed_);
+  DCHECK(mode == UNPIN_ALL || mode == UNPIN_ALL_EXCEPT_CURRENT);
   SCOPED_TIMER(unpin_timer_);
 
   for (BufferedBlockMgr::Block* block: blocks_) {
     if (!block->is_pinned()) continue;
-    if (!all && (block == write_block_ || (read_write_ && block == 
*read_block_))) {
+    if (mode == UNPIN_ALL_EXCEPT_CURRENT
+        && (block == write_block_ || (read_write_ && block == *read_block_))) {
       continue;
     }
     RETURN_IF_ERROR(UnpinBlock(block));
   }
-  if (all) {
+  if (mode == UNPIN_ALL) {
     read_block_ = blocks_.end();
     write_block_ = NULL;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h 
b/be/src/runtime/buffered-tuple-stream.h
index e14f16c..d3bfa81 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -228,6 +228,13 @@ class BufferedTupleStream {
   /// 'node_id' is only used for error reporting.
   Status Init(int node_id, RuntimeProfile* profile, bool pinned);
 
+  /// Prepares the stream for writing by attempting to allocate a write block.
+  /// Called after Init() and before the first AddRow() call.
+  /// 'got_buffer': set to true if the first write block was successfully 
pinned, or
+  ///     false if the block could not be pinned and no error was encountered. 
Undefined
+  ///     if an error status is returned.
+  Status PrepareForWrite(bool* got_buffer);
+
   /// Must be called for streams using small buffers to switch to IO-sized 
buffers.
   /// If it fails to get a buffer (i.e. the switch fails) it resets the 
use_small_buffers_
   /// back to false.
@@ -269,9 +276,20 @@ class BufferedTupleStream {
   /// block_mgr_client_ to pin the stream.
   Status PinStream(bool already_reserved, bool* pinned);
 
-  /// Unpins stream. If all is true, all blocks are unpinned, otherwise all 
blocks
-  /// except the write_block_ and read_block_ are unpinned.
-  Status UnpinStream(bool all = false);
+  /// Modes for UnpinStream().
+  enum UnpinMode {
+    /// All blocks in the stream are unpinned and the read/write positions in 
the stream
+    /// are reset. No more rows can be written to the stream after this. The 
stream can
+    /// be re-read from the beginning by calling PrepareForRead().
+    UNPIN_ALL,
+    /// All blocks are unpinned aside from the current read and write blocks 
(if any),
+    /// which is left in the same state. The unpinned stream can continue 
being read
+    /// or written from the current read or write positions.
+    UNPIN_ALL_EXCEPT_CURRENT,
+  };
+
+  /// Unpins stream with the given 'mode' as described above.
+  Status UnpinStream(UnpinMode mode);
 
   /// Get the next batch of output rows. Memory is still owned by the 
BufferedTupleStream
   /// and must be copied out by the caller.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 58ab501..b19ba33 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -374,25 +374,40 @@ void RuntimeProfile::ComputeTimeInProfile(int64_t total) {
 }
 
 void RuntimeProfile::AddChild(RuntimeProfile* child, bool indent, 
RuntimeProfile* loc) {
-  DCHECK(child != NULL);
   lock_guard<SpinLock> l(children_lock_);
-  if (child_map_.count(child->name_) > 0) {
-    // This child has already been added, so do nothing.
-    // Otherwise, the map and vector will be out of sync.
-    return;
-  }
-  child_map_[child->name_] = child;
+  ChildVector::iterator insert_pos;
   if (loc == NULL) {
-    children_.push_back(make_pair(child, indent));
+    insert_pos = children_.end();
   } else {
+    bool found = false;
     for (ChildVector::iterator it = children_.begin(); it != children_.end(); 
++it) {
       if (it->first == loc) {
-        children_.insert(++it, make_pair(child, indent));
-        return;
+        insert_pos = it + 1;
+        found = true;
+        break;
       }
     }
-    DCHECK(false) << "Invalid loc";
+    DCHECK(found) << "Invalid loc";
   }
+  AddChildLocked(child, indent, insert_pos);
+}
+
+void RuntimeProfile::AddChildLocked(
+    RuntimeProfile* child, bool indent, ChildVector::iterator insert_pos) {
+  children_lock_.DCheckLocked();
+  DCHECK(child != NULL);
+  if (child_map_.count(child->name_) > 0) {
+    // This child has already been added, so do nothing.
+    // Otherwise, the map and vector will be out of sync.
+    return;
+  }
+  child_map_[child->name_] = child;
+  children_.insert(insert_pos, make_pair(child, indent));
+}
+
+void RuntimeProfile::PrependChild(RuntimeProfile* child, bool indent) {
+  lock_guard<SpinLock> l(children_lock_);
+  AddChildLocked(child, indent, children_.begin());
 }
 
 void RuntimeProfile::GetChildren(vector<RuntimeProfile*>* children) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 77220d8..513f39d 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -124,6 +124,10 @@ class RuntimeProfile {
   void AddChild(RuntimeProfile* child,
       bool indent = true, RuntimeProfile* location = NULL);
 
+  /// Adds a child profile, similarly to AddChild(). The child profile is put 
before any
+  /// existing profiles.
+  void PrependChild(RuntimeProfile* child, bool indent = true);
+
   /// Sorts all children according to a custom comparator. Does not
   /// invalidate pointers to profiles.
   template <class Compare>
@@ -428,8 +432,13 @@ class RuntimeProfile {
 
   /// Create a subtree of runtime profiles from nodes, starting at *node_idx.
   /// On return, *node_idx is the index one past the end of this subtree
-  static RuntimeProfile* CreateFromThrift(ObjectPool* pool,
-      const std::vector<TRuntimeProfileNode>& nodes, int* node_idx);
+  static RuntimeProfile* CreateFromThrift(
+      ObjectPool* pool, const std::vector<TRuntimeProfileNode>& nodes, int* 
node_idx);
+
+  ///  Inserts 'child' before the iterator 'insert_pos' in 'children_'.
+  /// 'children_lock_' must be held by the caller.
+  void AddChildLocked(
+      RuntimeProfile* child, bool indent, ChildVector::iterator insert_pos);
 
   /// Print the child counters of the given counter name
   static void PrintChildCounters(const std::string& prefix,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/tests/stress/concurrent_select.py
----------------------------------------------------------------------
diff --git a/tests/stress/concurrent_select.py 
b/tests/stress/concurrent_select.py
index a43b402..1d1d142 100755
--- a/tests/stress/concurrent_select.py
+++ b/tests/stress/concurrent_select.py
@@ -692,7 +692,7 @@ class Query(object):
 class QueryRunner(object):
   """Encapsulates functionality to run a query and provide a runtime report."""
 
-  SPILLED_PATTERN = re.compile("ExecOption:.*Spilled")
+  SPILLED_PATTERNS= [re.compile("ExecOption:.*Spilled"), 
re.compile("SpilledRuns: [^0]")]
   BATCH_SIZE = 1024
 
   def __init__(self):
@@ -765,8 +765,8 @@ class QueryRunner(object):
           # Producing a query profile can be somewhat expensive. A v-tune 
profile of
           # impalad showed 10% of cpu time spent generating query profiles.
           report.profile = cursor.get_profile()
-          report.mem_was_spilled = \
-              QueryRunner.SPILLED_PATTERN.search(report.profile) is not None
+          report.mem_was_spilled = any([pattern.search(report.profile) is not 
None
+              for pattern in  QueryRunner.SPILLED_PATTERNS])
     except Exception as error:
       # A mem limit error would have been caught above, no need to check for 
that here.
       report.non_mem_limit_error = error

Reply via email to