IMPALA-3286: prefetching for PartitionedAggregationNode

This patch builds on top of the prefetching infrastructure to add
prefetching to PartitionedAggregationNode. Input batches are evaluated
in prefetch groups and hash table buckets are prefetched if the
prefetch_mode query option is set to HT_BUCKET.

We avoid some pointer indirections on the critical path by caching hash
tables in a 'hash_tbls_' array.

There is also a bit of cleanup to directly instantiate the templated
ProcessBatch() method to remove the ProcessBatch_true() and
ProcessBatch_false() hack, and also to separate out
ProcessBatchNoGrouping() so that it doesn't have to have the same
argument list as ProcessBatch().

Co-author: Michael Ho <[email protected]>

Change-Id: I7726454efb416d61080c4e11db0ee7ada18c149b
Reviewed-on: http://gerrit.cloudera.org:8080/3070
Reviewed-by: Michael Ho <[email protected]>
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9f4276ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9f4276ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9f4276ee

Branch: refs/heads/master
Commit: 9f4276eea80a2e162a3045cefcc38f936df271d4
Parents: 9172f4b
Author: Tim Armstrong <[email protected]>
Authored: Mon May 16 13:58:13 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Tue May 17 10:09:06 2016 -0700

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py          |  12 +-
 be/src/exec/hash-table.cc                      |   9 +-
 be/src/exec/partitioned-aggregation-node-ir.cc | 152 +++++++++++++-------
 be/src/exec/partitioned-aggregation-node.cc    |  99 ++++++++-----
 be/src/exec/partitioned-aggregation-node.h     |  92 +++++++-----
 5 files changed, 230 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f4276ee/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py 
b/be/src/codegen/gen_ir_descriptions.py
index 71fb1a7..5cc992c 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -40,10 +40,14 @@ options, args = parser.parse_args()
 ir_functions = [
   ["AGG_NODE_PROCESS_ROW_BATCH_WITH_GROUPING", "ProcessRowBatchWithGrouping"],
   ["AGG_NODE_PROCESS_ROW_BATCH_NO_GROUPING", "ProcessRowBatchNoGrouping"],
-  ["PART_AGG_NODE_PROCESS_BATCH_TRUE", "ProcessBatch_true"],
-  ["PART_AGG_NODE_PROCESS_BATCH_FALSE", "ProcessBatch_false"],
-  ["PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING", "ProcessBatchNoGrouping"],
-  ["PART_AGG_NODE_PROCESS_BATCH_STREAMING", "ProcessBatchStreaming"],
+  ["PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED",
+      "PartitionedAggregationNode12ProcessBatchILb0"],
+  ["PART_AGG_NODE_PROCESS_BATCH_AGGREGATED",
+      "PartitionedAggregationNode12ProcessBatchILb1"],
+  ["PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING",
+      "PartitionedAggregationNode22ProcessBatchNoGrouping"],
+  ["PART_AGG_NODE_PROCESS_BATCH_STREAMING",
+      "PartitionedAggregationNode21ProcessBatchStreaming"],
   ["AVG_UPDATE_BIGINT", "9AvgUpdateIN10impala_udf9BigIntVal"],
   ["AVG_UPDATE_DOUBLE", "9AvgUpdateIN10impala_udf9DoubleVal"],
   ["AVG_UPDATE_TIMESTAMP", "TimestampAvgUpdate"],

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f4276ee/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 953ddce..719b600 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -854,9 +854,12 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* 
state, bool use_murmur,
   Value* cur_expr_values = builder.CreateLoad(cur_expr_values_ptr);
 
   // Load cur_expr_values_null_ into a LLVM pointer.
-  Value* cur_expr_values_null_ptr =
-      codegen->CastPtrToLlvmPtr(buffer_ptr_type, 
&expr_values_cache_.cur_expr_values_null_);
-  Value* cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+  Value* cur_expr_values_null = NULL;
+  if (stores_nulls_) {
+    Value* cur_expr_values_null_ptr = codegen->CastPtrToLlvmPtr(
+        buffer_ptr_type, &expr_values_cache_.cur_expr_values_null_);
+    cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+  }
 
   // Call GetHashSeed() to get seeds_[level_]
   Function* get_hash_seed_fn =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f4276ee/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc 
b/be/src/exec/partitioned-aggregation-node-ir.cc
index d575c01..d479189 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -23,8 +23,7 @@
 
 using namespace impala;
 
-Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch,
-    const HashTableCtx* ht_ctx) { // 'ht_ctx' is unused
+Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch) {
   Tuple* output_tuple = singleton_output_tuple_;
   FOREACH_ROW(batch, 0, batch_iter) {
     UpdateTuple(&agg_fn_ctxs_[0], output_tuple, batch_iter.Get());
@@ -34,7 +33,7 @@ Status 
PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch,
 
 template<bool AGGREGATED_ROWS>
 Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
-    HashTableCtx* __restrict__ ht_ctx) {
+    TPrefetchMode::type prefetch_mode, HashTableCtx* __restrict__ ht_ctx) {
   DCHECK(!hash_partitions_.empty());
   DCHECK(!is_streaming_preagg_);
 
@@ -44,39 +43,78 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* 
batch,
   // TODO: Once we have a histogram with the number of rows per partition, we 
will have
   // accurate resize calls.
   RETURN_IF_ERROR(CheckAndResizeHashPartitions(batch->num_rows(), ht_ctx));
-  FOREACH_ROW(batch, 0, batch_iter) {
-    RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx));
+
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int cache_size = expr_vals_cache->capacity();
+  const int num_rows = batch->num_rows();
+  for (int group_start = 0; group_start < num_rows; group_start += cache_size) 
{
+    EvalAndHashPrefetchGroup<AGGREGATED_ROWS>(batch, group_start, 
prefetch_mode, ht_ctx);
+
+    FOREACH_ROW_LIMIT(batch, group_start, cache_size, batch_iter) {
+      RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx));
+      expr_vals_cache->NextRow();
+    }
+    DCHECK(expr_vals_cache->AtEnd());
   }
   return Status::OK();
 }
 
 template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
-    HashTableCtx* __restrict__ ht_ctx) {
-  if (AGGREGATED_ROWS) {
-    if (!ht_ctx->EvalAndHashBuild(row)) return Status::OK();
-  } else {
-    if (!ht_ctx->EvalAndHashProbe(row)) return Status::OK();
+void IR_ALWAYS_INLINE PartitionedAggregationNode::EvalAndHashPrefetchGroup(
+    RowBatch* batch, int start_row_idx, TPrefetchMode::type prefetch_mode,
+    HashTableCtx* ht_ctx) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int cache_size = expr_vals_cache->capacity();
+
+  expr_vals_cache->Reset();
+  FOREACH_ROW_LIMIT(batch, start_row_idx, cache_size, batch_iter) {
+    TupleRow* row = batch_iter.Get();
+    bool is_null;
+    if (AGGREGATED_ROWS) {
+      is_null = !ht_ctx->EvalAndHashBuild(row);
+    } else {
+      is_null = !ht_ctx->EvalAndHashProbe(row);
+    }
+    // Hoist lookups out of non-null branch to speed up non-null case.
+    const uint32_t hash = expr_vals_cache->ExprValuesHash();
+    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+    HashTable* hash_tbl = GetHashTable(partition_idx);
+    if (is_null) {
+      expr_vals_cache->SetRowNull();
+    } else if (prefetch_mode != TPrefetchMode::NONE) {
+      if (LIKELY(hash_tbl != NULL)) hash_tbl->PrefetchBucket<false>(hash);
+    }
+    expr_vals_cache->NextRow();
   }
 
-  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
+  expr_vals_cache->ResetForRead();
+}
+
+template<bool AGGREGATED_ROWS>
+Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
+    HashTableCtx* __restrict__ ht_ctx) {
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  // Hoist lookups out of non-null branch to speed up non-null case.
+  const uint32_t hash = expr_vals_cache->ExprValuesHash();
+  const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+  if (expr_vals_cache->IsRowNull()) return Status::OK();
   // To process this row, we first see if it can be aggregated or inserted 
into this
   // partition's hash table. If we need to insert it and that fails, due to 
OOM, we
   // spill the partition. The partition to spill is not necessarily 
dst_partition,
   // so we can try again to insert the row.
-  Partition* dst_partition = hash_partitions_[hash >> (32 - 
NUM_PARTITIONING_BITS)];
-  if (dst_partition->is_spilled()) {
+  HashTable* hash_tbl = GetHashTable(partition_idx);
+  Partition* dst_partition = hash_partitions_[partition_idx];
+  DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == NULL);
+  if (hash_tbl == NULL) {
     // This partition is already spilled, just append the row.
     return AppendSpilledRow<AGGREGATED_ROWS>(dst_partition, row);
   }
 
-  HashTable* ht = dst_partition->hash_tbl.get();
-  DCHECK(ht != NULL);
   DCHECK(dst_partition->aggregated_row_stream->is_pinned());
   bool found;
   // Find the appropriate bucket in the hash table. There will always be a free
   // bucket because we checked the size above.
-  HashTable::Iterator it = ht->FindBuildRowBucket(ht_ctx, &found);
+  HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
   DCHECK(!it.AtEnd()) << "Hash table had no free buckets";
   if (AGGREGATED_ROWS) {
     // If the row is already an aggregate row, it cannot match anything in the
@@ -130,50 +168,47 @@ Status 
PartitionedAggregationNode::AppendSpilledRow(Partition* __restrict__ part
   return AppendSpilledRow(stream, row);
 }
 
-Status PartitionedAggregationNode::ProcessBatch_false(
-    RowBatch* batch, HashTableCtx* __restrict__ ht_ctx) {
-  return ProcessBatch<false>(batch, ht_ctx);
-}
-
-Status PartitionedAggregationNode::ProcessBatch_true(
-    RowBatch* batch, HashTableCtx* __restrict__ ht_ctx) {
-  return ProcessBatch<true>(batch, ht_ctx);
-}
-
 Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
-    RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx,
-    int remaining_capacity[PARTITION_FANOUT]) {
+    TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch,
+    HashTableCtx* __restrict__ ht_ctx, int 
remaining_capacity[PARTITION_FANOUT]) {
   DCHECK(is_streaming_preagg_);
   DCHECK_EQ(out_batch->num_rows(), 0);
   DCHECK_LE(in_batch->num_rows(), out_batch->capacity());
 
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
-  FOREACH_ROW(in_batch, 0, in_batch_iter) {
-    TupleRow* in_row = in_batch_iter.Get();
-    if (!ht_ctx->EvalAndHashProbe(in_row)) continue;
-    uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
-    const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-
-    if (TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx], in_row, 
hash,
-            &remaining_capacity[partition_idx], &process_batch_status_)) {
-      continue;
-    }
-    RETURN_IF_ERROR(process_batch_status_);
+  HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+  const int num_rows = in_batch->num_rows();
+  const int cache_size = expr_vals_cache->capacity();
+  for (int group_start = 0; group_start < num_rows; group_start += cache_size) 
{
+    EvalAndHashPrefetchGroup<false>(in_batch, group_start, prefetch_mode, 
ht_ctx);
 
-    // Tuple is not going into hash table, add it to the output batch.
-    Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_ctxs_,
-        out_batch->tuple_data_pool(), &process_batch_status_);
-    if (UNLIKELY(intermediate_tuple == NULL)) {
-      DCHECK(!process_batch_status_.ok());
-      return process_batch_status_;
+    FOREACH_ROW_LIMIT(in_batch, group_start, cache_size, in_batch_iter) {
+      // Hoist lookups out of non-null branch to speed up non-null case.
+      TupleRow* in_row = in_batch_iter.Get();
+      const uint32_t hash = expr_vals_cache->ExprValuesHash();
+      const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+      if (!expr_vals_cache->IsRowNull() &&
+          !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx],
+            GetHashTable(partition_idx), in_row, hash, 
&remaining_capacity[partition_idx],
+            &process_batch_status_)) {
+        RETURN_IF_ERROR(process_batch_status_);
+        // Tuple is not going into hash table, add it to the output batch.
+        Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_ctxs_,
+            out_batch->tuple_data_pool(), &process_batch_status_);
+        if (UNLIKELY(intermediate_tuple == NULL)) {
+          DCHECK(!process_batch_status_.ok());
+          return process_batch_status_;
+        }
+        UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row, false);
+        out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
+        out_batch_iterator.Next();
+        out_batch->CommitLastRow();
+      }
+      DCHECK(process_batch_status_.ok());
+      expr_vals_cache->NextRow();
     }
-    UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row, false);
-
-    out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);
-    out_batch_iterator.Next();
-    out_batch->CommitLastRow();
+    DCHECK(expr_vals_cache->AtEnd());
   }
-
   if (needs_serialize) {
     FOREACH_ROW(out_batch, 0, out_batch_iter) {
       AggFnEvaluator::Serialize(aggregate_evaluators_, agg_fn_ctxs_,
@@ -186,13 +221,14 @@ Status 
PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
 
 bool PartitionedAggregationNode::TryAddToHashTable(
     HashTableCtx* __restrict__ ht_ctx, Partition* __restrict__ partition,
-    TupleRow* __restrict__ in_row, uint32_t hash, int* __restrict__ 
remaining_capacity,
-    Status* status) {
+    HashTable* __restrict__ hash_tbl, TupleRow* __restrict__ in_row,
+    uint32_t hash, int* __restrict__ remaining_capacity, Status* status) {
   DCHECK(remaining_capacity != NULL);
+  DCHECK_EQ(hash_tbl, partition->hash_tbl.get());
   DCHECK_GE(*remaining_capacity, 0);
   bool found;
   // This is called from ProcessBatchStreaming() so the rows are not 
aggregated.
-  HashTable::Iterator it = partition->hash_tbl->FindBuildRowBucket(ht_ctx, 
&found);
+  HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);
   Tuple* intermediate_tuple;
   if (found) {
     intermediate_tuple = it.GetTuple();
@@ -214,3 +250,9 @@ bool PartitionedAggregationNode::TryAddToHashTable(
   UpdateTuple(&partition->agg_fn_ctxs[0], intermediate_tuple, in_row, false);
   return true;
 }
+
+// Instantiate required templates.
+template Status PartitionedAggregationNode::ProcessBatch<false>(RowBatch*,
+    TPrefetchMode::type, HashTableCtx*);
+template Status PartitionedAggregationNode::ProcessBatch<true>(RowBatch*,
+    TPrefetchMode::type, HashTableCtx*);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f4276ee/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc 
b/be/src/exec/partitioned-aggregation-node.cc
index 2cde059..0a6a811 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -109,7 +109,8 @@ PartitionedAggregationNode::PartitionedAggregationNode(
     needs_serialize_(false),
     block_mgr_client_(NULL),
     output_partition_(NULL),
-    process_row_batch_fn_(NULL),
+    process_batch_no_grouping_fn_(NULL),
+    process_batch_fn_(NULL),
     process_batch_streaming_fn_(NULL),
     build_timer_(NULL),
     ht_resize_timer_(NULL),
@@ -274,25 +275,9 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* 
state) {
   bool codegen_enabled = false;
   Status codegen_status;
   if (state->codegen_enabled()) {
-    LlvmCodeGen* codegen;
-    RETURN_IF_ERROR(state->GetCodegen(&codegen));
-    if (is_streaming_preagg_) {
-      Function* codegen_process_batch_streaming_fn;
-      codegen_status = 
CodegenProcessBatchStreaming(&codegen_process_batch_streaming_fn);
-      if (codegen_status.ok()) {
-        codegen->AddFunctionToJit(codegen_process_batch_streaming_fn,
-            reinterpret_cast<void**>(&process_batch_streaming_fn_));
-        codegen_enabled = true;
-      }
-    } else {
-      Function* codegen_process_row_batch_fn;
-      codegen_status = CodegenProcessBatch(&codegen_process_row_batch_fn);
-      if (codegen_status.ok()) {
-        codegen->AddFunctionToJit(codegen_process_row_batch_fn,
-            reinterpret_cast<void**>(&process_row_batch_fn_));
-        codegen_enabled = true;
-      }
-    }
+    codegen_status = is_streaming_preagg_ ? CodegenProcessBatchStreaming()
+                                          : CodegenProcessBatch();
+    codegen_enabled = codegen_status.ok();
   }
   AddCodegenExecOption(codegen_enabled, codegen_status);
   return Status::OK();
@@ -330,14 +315,21 @@ Status PartitionedAggregationNode::Open(RuntimeState* 
state) {
       }
     }
 
+    TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
     SCOPED_TIMER(build_timer_);
-    if (process_row_batch_fn_ != NULL) {
-      RETURN_IF_ERROR(process_row_batch_fn_(this, &batch, ht_ctx_.get()));
-    } else if (grouping_expr_ctxs_.empty()) {
-      RETURN_IF_ERROR(ProcessBatchNoGrouping(&batch));
+    if (grouping_expr_ctxs_.empty()) {
+      if (process_batch_no_grouping_fn_ != NULL) {
+        RETURN_IF_ERROR(process_batch_no_grouping_fn_(this, &batch));
+      } else {
+        RETURN_IF_ERROR(ProcessBatchNoGrouping(&batch));
+      }
     } else {
       // There is grouping, so we will do partitioned aggregation.
-      RETURN_IF_ERROR(ProcessBatch<false>(&batch, ht_ctx_.get()));
+      if (process_batch_fn_ != NULL) {
+        RETURN_IF_ERROR(process_batch_fn_(this, &batch, prefetch_mode, 
ht_ctx_.get()));
+      } else {
+        RETURN_IF_ERROR(ProcessBatch<false>(&batch, prefetch_mode, 
ht_ctx_.get()));
+      }
     }
     batch.Reset();
   } while (!eos);
@@ -542,8 +534,9 @@ Status 
PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
     int remaining_capacity[PARTITION_FANOUT];
     bool ht_needs_expansion = false;
     for (int i = 0; i < PARTITION_FANOUT; ++i) {
-      DCHECK(hash_partitions_[i]->hash_tbl != NULL);
-      remaining_capacity[i] = 
hash_partitions_[i]->hash_tbl->NumInsertsBeforeResize();
+      HashTable* hash_tbl = GetHashTable(i);
+      DCHECK(hash_tbl != NULL);
+      remaining_capacity[i] = hash_tbl->NumInsertsBeforeResize();
       ht_needs_expansion |= remaining_capacity[i] < child_batch_->num_rows();
     }
 
@@ -554,7 +547,7 @@ Status 
PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
     // should always use the remaining space in the hash table to avoid 
wasting memory.
     if (ht_needs_expansion && ShouldExpandPreaggHashTables()) {
       for (int i = 0; i < PARTITION_FANOUT; ++i) {
-        HashTable* ht = hash_partitions_[i]->hash_tbl.get();
+        HashTable* ht = GetHashTable(i);
         if (remaining_capacity[i] < child_batch_->num_rows()) {
           SCOPED_TIMER(ht_resize_timer_);
           if (ht->CheckAndResize(child_batch_->num_rows(), ht_ctx_.get())) {
@@ -564,12 +557,13 @@ Status 
PartitionedAggregationNode::GetRowsStreaming(RuntimeState* state,
       }
     }
 
+    TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
     if (process_batch_streaming_fn_ != NULL) {
-      RETURN_IF_ERROR(process_batch_streaming_fn_(this, needs_serialize_,
+      RETURN_IF_ERROR(process_batch_streaming_fn_(this, needs_serialize_, 
prefetch_mode,
           child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity));
     } else {
-      RETURN_IF_ERROR(ProcessBatchStreaming(needs_serialize_, 
child_batch_.get(),
-            out_batch, ht_ctx_.get(), remaining_capacity ));
+      RETURN_IF_ERROR(ProcessBatchStreaming(needs_serialize_, prefetch_mode,
+          child_batch_.get(), out_batch, ht_ctx_.get(), remaining_capacity ));
     }
 
     child_batch_->Reset(); // All rows from child_batch_ were processed.
@@ -1145,6 +1139,7 @@ Status 
PartitionedAggregationNode::CreateHashPartitions(int level) {
     DCHECK(new_partition != NULL);
     hash_partitions_.push_back(partition_pool_->Add(new_partition));
     RETURN_IF_ERROR(new_partition->InitStreams());
+    hash_tbls_[i] = NULL;
   }
   if (!is_streaming_preagg_) {
     
DCHECK_GT(state_->block_mgr()->num_reserved_buffers_remaining(block_mgr_client_),
 0);
@@ -1166,6 +1161,7 @@ Status 
PartitionedAggregationNode::CreateHashPartitions(int level) {
       }
       RETURN_IF_ERROR(hash_partitions_[i]->Spill());
     }
+    hash_tbls_[i] = hash_partitions_[i]->hash_tbl.get();
   }
 
   COUNTER_ADD(partitions_created_, hash_partitions_.size());
@@ -1293,12 +1289,14 @@ Status 
PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stre
       RETURN_IF_ERROR(SpillPartition());
     }
 
+    TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
     bool eos = false;
     RowBatch batch(AGGREGATED_ROWS ? *intermediate_row_desc_ : 
children_[0]->row_desc(),
                    state_->batch_size(), mem_tracker());
     do {
       RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos));
-      RETURN_IF_ERROR(ProcessBatch<AGGREGATED_ROWS>(&batch, ht_ctx_.get()));
+      RETURN_IF_ERROR(
+          ProcessBatch<AGGREGATED_ROWS>(&batch, prefetch_mode, ht_ctx_.get()));
       RETURN_IF_ERROR(state_->GetQueryStatus());
       FreeLocalAllocations();
       batch.Reset();
@@ -1331,6 +1329,7 @@ Status PartitionedAggregationNode::SpillPartition() {
     return state_->block_mgr()->MemLimitTooLowError(block_mgr_client_, id());
   }
 
+  hash_tbls_[partition_idx] = NULL;
   return hash_partitions_[partition_idx]->Spill();
 }
 
@@ -1398,6 +1397,7 @@ void PartitionedAggregationNode::ClosePartitions() {
   aggregated_partitions_.clear();
   spilled_partitions_.clear();
   hash_partitions_.clear();
+  memset(hash_tbls_, 0, sizeof(hash_tbls_));
   partition_pool_->Clear();
 }
 
@@ -1792,7 +1792,7 @@ Status 
PartitionedAggregationNode::CodegenUpdateTuple(Function** fn) {
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::CodegenProcessBatch(Function** fn) {
+Status PartitionedAggregationNode::CodegenProcessBatch() {
   LlvmCodeGen* codegen;
   RETURN_IF_ERROR(state_->GetCodegen(&codegen));
   SCOPED_TIMER(codegen->codegen_timer());
@@ -1802,7 +1802,7 @@ Status 
PartitionedAggregationNode::CodegenProcessBatch(Function** fn) {
 
   // Get the cross compiled update row batch function
   IRFunction::Type ir_fn = (!grouping_expr_ctxs_.empty() ?
-      IRFunction::PART_AGG_NODE_PROCESS_BATCH_FALSE :
+      IRFunction::PART_AGG_NODE_PROCESS_BATCH_UNAGGREGATED :
       IRFunction::PART_AGG_NODE_PROCESS_BATCH_NO_GROUPING);
   Function* process_batch_fn = codegen->GetFunction(ir_fn, true);
   DCHECK(process_batch_fn != NULL);
@@ -1810,6 +1810,13 @@ Status 
PartitionedAggregationNode::CodegenProcessBatch(Function** fn) {
   int replaced;
   if (!grouping_expr_ctxs_.empty()) {
     // Codegen for grouping using hash table
+
+    // Replace prefetch_mode with constant so branches can be optimised out.
+    TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
+    Value* prefetch_mode_arg = codegen->GetArgument(process_batch_fn, 3);
+    prefetch_mode_arg->replaceAllUsesWith(
+        ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
     // The codegen'd ProcessBatch function is only used in Open() with level_ 
= 0,
     // so don't use murmur hash
     Function* hash_fn;
@@ -1838,15 +1845,20 @@ Status 
PartitionedAggregationNode::CodegenProcessBatch(Function** fn) {
 
   replaced = codegen->ReplaceCallSites(process_batch_fn, update_tuple_fn, 
"UpdateTuple");
   DCHECK_GE(replaced, 1);
-  *fn = codegen->FinalizeFunction(process_batch_fn);
-  if (*fn == NULL) {
+  process_batch_fn = codegen->FinalizeFunction(process_batch_fn);
+  if (process_batch_fn == NULL) {
     return Status("PartitionedAggregationNode::CodegenProcessBatch(): 
codegen'd "
         "ProcessBatch() function failed verification, see log");
   }
+
+  void **codegened_fn_ptr = grouping_expr_ctxs_.empty() ?
+      reinterpret_cast<void**>(&process_batch_no_grouping_fn_) :
+      reinterpret_cast<void**>(&process_batch_fn_);
+  codegen->AddFunctionToJit(process_batch_fn, codegened_fn_ptr);
   return Status::OK();
 }
 
-Status PartitionedAggregationNode::CodegenProcessBatchStreaming(Function** fn) 
{
+Status PartitionedAggregationNode::CodegenProcessBatchStreaming() {
   DCHECK(is_streaming_preagg_);
   LlvmCodeGen* codegen;
   RETURN_IF_ERROR(state_->GetCodegen(&codegen));
@@ -1861,6 +1873,12 @@ Status 
PartitionedAggregationNode::CodegenProcessBatchStreaming(Function** fn) {
   needs_serialize_arg->replaceAllUsesWith(
       ConstantInt::get(Type::getInt1Ty(codegen->context()), needs_serialize_));
 
+  // Replace prefetch_mode with constant so branches can be optimised out.
+  TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
+  Value* prefetch_mode_arg = codegen->GetArgument(process_batch_streaming_fn, 
3);
+  prefetch_mode_arg->replaceAllUsesWith(
+      ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
   Function* update_tuple_fn;
   RETURN_IF_ERROR(CodegenUpdateTuple(&update_tuple_fn));
 
@@ -1893,11 +1911,14 @@ Status 
PartitionedAggregationNode::CodegenProcessBatchStreaming(Function** fn) {
   DCHECK_EQ(replaced, 1);
 
   DCHECK(process_batch_streaming_fn != NULL);
-  *fn = codegen->FinalizeFunction(process_batch_streaming_fn);
-  if (*fn == NULL) {
+  process_batch_streaming_fn = 
codegen->FinalizeFunction(process_batch_streaming_fn);
+  if (process_batch_streaming_fn == NULL) {
     return Status("PartitionedAggregationNode::CodegenProcessBatchStreaming(): 
codegen'd "
         "ProcessBatchStreaming() function failed verification, see log");
   }
+
+  codegen->AddFunctionToJit(process_batch_streaming_fn,
+      reinterpret_cast<void**>(&process_batch_streaming_fn_));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f4276ee/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h 
b/be/src/exec/partitioned-aggregation-node.h
index ab560c5..73976c2 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -228,13 +228,17 @@ class PartitionedAggregationNode : public ExecNode {
   Partition* output_partition_;
   HashTable::Iterator output_iterator_;
 
-  typedef Status (*ProcessRowBatchFn)(
-      PartitionedAggregationNode*, RowBatch*, HashTableCtx*);
-  /// Jitted ProcessRowBatch function pointer.  Null if codegen is disabled.
-  ProcessRowBatchFn process_row_batch_fn_;
+  typedef Status (*ProcessBatchNoGroupingFn)(PartitionedAggregationNode*, 
RowBatch*);
+  /// Jitted ProcessBatchNoGrouping function pointer. Null if codegen is 
disabled.
+  ProcessBatchNoGroupingFn process_batch_no_grouping_fn_;
 
-  typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool, 
RowBatch*,
-      RowBatch*, HashTableCtx*, int[PARTITION_FANOUT]);
+  typedef Status (*ProcessBatchFn)(
+      PartitionedAggregationNode*, RowBatch*, TPrefetchMode::type, 
HashTableCtx*);
+  /// Jitted ProcessBatch function pointer. Null if codegen is disabled.
+  ProcessBatchFn process_batch_fn_;
+
+  typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool,
+      TPrefetchMode::type, RowBatch*, RowBatch*, HashTableCtx*, 
int[PARTITION_FANOUT]);
   /// Jitted ProcessBatchStreaming function pointer.  Null if codegen is 
disabled.
   ProcessBatchStreamingFn process_batch_streaming_fn_;
 
@@ -312,6 +316,9 @@ class PartitionedAggregationNode : public ExecNode {
   /// Current partitions we are partitioning into.
   std::vector<Partition*> hash_partitions_;
 
+  /// Cache for hash tables in 'hash_partitions_'.
+  HashTable* hash_tbls_[PARTITION_FANOUT];
+
   /// All partitions that have been spilled and need further processing.
   std::list<Partition*> spilled_partitions_;
 
@@ -390,6 +397,13 @@ class PartitionedAggregationNode : public ExecNode {
   /// a temporary buffer.
   boost::scoped_ptr<BufferedTupleStream> serialize_stream_;
 
+  /// Accessor for 'hash_tbls_' that verifies consistency with the partitions.
+  HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) {
+    HashTable* ht = hash_tbls_[partition_idx];
+    DCHECK_EQ(ht, hash_partitions_[partition_idx]->hash_tbl.get());
+    return ht;
+  }
+
   /// Materializes 'row_batch' in either grouping or non-grouping case.
   Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
 
@@ -465,19 +479,31 @@ class PartitionedAggregationNode : public ExecNode {
                         Tuple* tuple, MemPool* pool);
 
   /// Do the aggregation for all tuple rows in the batch when there is no 
grouping.
-  /// The HashTableCtx argument is unused, but included so the signature 
matches that of
-  /// ProcessBatch() for codegen. This function is replaced by codegen.
-  Status ProcessBatchNoGrouping(RowBatch* batch, const HashTableCtx* ht_ctx = 
NULL);
+  /// This function is replaced by codegen.
+  Status ProcessBatchNoGrouping(RowBatch* batch);
 
   /// Processes a batch of rows. This is the core function of the algorithm. 
We partition
   /// the rows into hash_partitions_, spilling as necessary.
   /// If AGGREGATED_ROWS is true, it means that the rows in the batch are 
already
   /// pre-aggregated.
+  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not 
PREFETCH_NONE,
+  ///     hash table buckets will be prefetched based on the hash values 
computed. Note
+  ///     that 'prefetch_mode' will be substituted with constants during 
codegen time.
   //
-  /// This function is replaced by codegen. It's inlined into 
ProcessBatch_true/false in
-  /// the IR module. We pass in ht_ctx_.get() as an argument for performance.
+  /// This function is replaced by codegen. We pass in ht_ctx_.get() as an 
argument for
+  /// performance.
+  template<bool AGGREGATED_ROWS>
+  Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch,
+      TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx);
+
+  /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the 
results in
+  /// the expression values cache in 'ht_ctx'. The number of rows evaluated 
depends on
+  /// the capacity of the cache. 'prefetch_mode' specifies the prefetching 
mode in use.
+  /// If it's not PREFETCH_NONE, hash table buckets for the computed hashes 
will be
+  /// prefetched. Note that codegen replaces 'prefetch_mode' with a constant.
   template<bool AGGREGATED_ROWS>
-  Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, HashTableCtx* ht_ctx);
+  void EvalAndHashPrefetchGroup(RowBatch* batch, int start_row_idx,
+      TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx);
 
   /// This function processes each individual row in ProcessBatch(). Must be 
inlined into
   /// ProcessBatch for codegen to substitute function calls with codegen'd 
versions.
@@ -532,25 +558,28 @@ class PartitionedAggregationNode : public ExecNode {
   /// store all of the rows in 'in_batch'.
   /// 'needs_serialize' is an argument so that codegen can replace it with a 
constant,
   ///     rather than using the member variable 'needs_serialize_'.
+  /// 'prefetch_mode' specifies the prefetching mode in use. If it's not 
PREFETCH_NONE,
+  ///     hash table buckets will be prefetched based on the hash values 
computed. Note
+  ///     that 'prefetch_mode' will be substituted with constants during 
codegen time.
   /// 'remaining_capacity' is an array with PARTITION_FANOUT entries with the 
number of
   ///     additional rows that can be added to the hash table per partition. 
It is updated
   ///     by ProcessBatchStreaming() when it inserts new rows.
   /// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the 
optimiser.
-  Status ProcessBatchStreaming(bool needs_serialize, RowBatch* in_batch,
-      RowBatch* out_batch, HashTableCtx* ht_ctx,
+  Status ProcessBatchStreaming(bool needs_serialize, TPrefetchMode::type 
prefetch_mode,
+      RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* ht_ctx,
       int remaining_capacity[PARTITION_FANOUT]);
 
-  /// Tries to add intermediate to the hash table of 'partition' for streaming 
aggregation.
-  /// The input row must have been evaluated with 'ht_ctx', with 'hash' set to 
the
-  /// corresponding hash. If the tuple already exists in the hash table, 
update the tuple
-  /// and return true. Otherwise try to create a new entry in the hash table, 
returning
-  /// true if successful or false if the table is full. 'remaining_capacity' 
keeps track
-  /// of how many more entries can be added to the hash table so we can avoid 
retrying
-  /// inserts. It is decremented if an insert succeeds and set to zero if an 
insert
-  /// fails. If an error occurs, returns false and sets 'status'.
+  /// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' 
for streaming
+  /// aggregation. The input row must have been evaluated with 'ht_ctx', with 
'hash' set
+  /// to the corresponding hash. If the tuple already exists in the hash 
table, update
+  /// the tuple and return true. Otherwise try to create a new entry in the 
hash table,
+  /// returning true if successful or false if the table is full. 
'remaining_capacity'
+  /// keeps track of how many more entries can be added to the hash table so 
we can avoid
+  /// retrying inserts. It is decremented if an insert succeeds and set to 
zero if an
+  /// insert fails. If an error occurs, returns false and sets 'status'.
   bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx,
-      Partition* partition, TupleRow* in_row, uint32_t hash, int* 
remaining_capacity,
-      Status* status);
+      Partition* partition, HashTable* hash_tbl, TupleRow* in_row, uint32_t 
hash,
+      int* remaining_capacity, Status* status);
 
   /// Initializes hash_partitions_. 'level' is the level for the partitions to 
create.
   /// Also sets ht_ctx_'s level to 'level'.
@@ -601,18 +630,15 @@ class PartitionedAggregationNode : public ExecNode {
   /// Codegen the non-streaming process row batch loop. The loop has already 
been
   /// compiled to IR and loaded into the codegen object. UpdateAggTuple has 
also been
   /// codegen'd to IR. This function will modify the loop subsituting the 
statically
-  /// compiled functions with codegen'd ones.
+  /// compiled functions with codegen'd ones. 'process_batch_fn_' or
+  /// 'process_batch_no_grouping_fn_' will be updated with the codegened 
function
+  /// depending on whether this is a grouping or non-grouping aggregation.
   /// Assumes AGGREGATED_ROWS = false.
-  Status CodegenProcessBatch(llvm::Function** fn);
+  Status CodegenProcessBatch();
 
   /// Codegen the materialization loop for streaming preaggregations.
-  Status CodegenProcessBatchStreaming(llvm::Function** fn);
-
-  /// Functions to instantiate templated versions of ProcessBatch().
-  /// The xcompiled versions of these functions are used in 
CodegenProcessBatch().
-  /// TODO: is there a better way to do this?
-  Status ProcessBatch_false(RowBatch* batch, HashTableCtx* ht_ctx);
-  Status ProcessBatch_true(RowBatch* batch, HashTableCtx* ht_ctx);
+  /// 'process_batch_streaming_fn_' will be updated with the codegened 
function.
+  Status CodegenProcessBatchStreaming();
 
   /// We need two buffers per partition, one for the aggregated stream and one
   /// for the unaggregated stream. We need an additional buffer to read the 
stream

Reply via email to