IMPALA-3286: Prefetching for PHJ probing.

This change pipelines the code which probes the hash tables.
This is based on the idea which Mostafa presented earlier.
Essentially, all rows in a row batch will be evaluated and
hashed first before being probed against the hash tables.
Hash table buckets are prefetched as hash values of rows are
computed.

To avoid re-evaluating the rows again during probing (as the rows
have been evaluated once to compute the hash values), hash table
context has been updated to cache the evaluated expression values,
null bits and hash values of some number of rows. Hash table context
provies a new iterator like interface to iterate through the cached
values.

A PREFETCH_MODE query option has also been added to disable prefetching
if necessary. The default mode is 1 which means hash table buckets will
be prefetched. In the future, this mode may be extended to support hash
table buckets' data prefetching too.

Combined with the build side prefetching, a self join of table lineitem
improves by 40% on a single node run on average:

select count(*)
from lineitem o1, lineitem o2
where o1.l_orderkey = o2.l_orderkey and
      o1.l_linenumber = o2.l_linenumber;

Change-Id: Ib42b93d99d09c833571e39d20d58c11ef73f3cc0
Reviewed-on: http://gerrit.cloudera.org:8080/2959
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a59408b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a59408b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a59408b5

Branch: refs/heads/master
Commit: a59408b575ff6f48b8f0e1084ebcf7489c9de8af
Parents: b634a55
Author: Michael Ho <[email protected]>
Authored: Wed Apr 27 17:09:50 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Tue May 17 01:30:12 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hash-table-test.cc                  | 169 ++++---
 be/src/exec/hash-table.cc                       | 484 ++++++++++++++-----
 be/src/exec/hash-table.h                        | 334 +++++++++----
 be/src/exec/hash-table.inline.h                 |  60 +--
 be/src/exec/partitioned-aggregation-node-ir.cc  |  14 +-
 be/src/exec/partitioned-aggregation-node.cc     |  16 +-
 be/src/exec/partitioned-hash-join-node-ir.cc    | 344 ++++++++-----
 be/src/exec/partitioned-hash-join-node.cc       | 122 +++--
 be/src/exec/partitioned-hash-join-node.h        |  62 ++-
 be/src/exec/partitioned-hash-join-node.inline.h |   1 +
 be/src/exprs/expr-context.cc                    |  23 +-
 be/src/exprs/expr-context.h                     |  10 +-
 be/src/runtime/row-batch.h                      |  22 +-
 be/src/runtime/test-env.cc                      |   1 +
 be/src/service/query-options.cc                 |  11 +
 be/src/service/query-options.h                  |   5 +-
 be/src/udf/udf-internal.h                       |   3 +
 common/thrift/ImpalaInternalService.thrift      |   3 +
 common/thrift/ImpalaService.thrift              |   5 +-
 common/thrift/Types.thrift                      |   8 +
 20 files changed, 1177 insertions(+), 520 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 25cd4f1..7559473 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -145,13 +145,12 @@ class HashTableTest : public testing::Test {
 
   void ProbeTest(HashTable* table, HashTableCtx* ht_ctx,
       ProbeTestData* data, int num_data, bool scan) {
-    uint32_t hash = 0;
     for (int i = 0; i < num_data; ++i) {
       TupleRow* row = data[i].probe_row;
 
       HashTable::Iterator iter;
-      if (ht_ctx->EvalAndHashProbe(row, &hash)) continue;
-      iter = table->FindProbeRow(ht_ctx, hash);
+      if (ht_ctx->EvalAndHashProbe(row)) continue;
+      iter = table->FindProbeRow(ht_ctx);
 
       if (data[i].expected_build_rows.size() == 0) {
         EXPECT_TRUE(iter.AtEnd());
@@ -183,7 +182,6 @@ class HashTableTest : public testing::Test {
       int max_num_blocks = 100, int reserved_blocks = 10) {
     EXPECT_TRUE(test_env_->CreateQueryState(0, max_num_blocks, block_size,
         &runtime_state_).ok());
-
     BufferedBlockMgr::Client* client;
     EXPECT_TRUE(runtime_state_->block_mgr()->RegisterClient("", 
reserved_blocks, false,
         &tracker_, runtime_state_, &client).ok());
@@ -236,19 +234,21 @@ class HashTableTest : public testing::Test {
     // Create the hash table and insert the build rows
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(true, 1024, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, true /* 
stores_nulls_ */,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
-
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, true /* stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, 
&ht_ctx);
+    EXPECT_OK(status);
     for (int i = 0; i < 2; ++i) {
-      uint32_t hash = 0;
-      if (!ht_ctx.EvalAndHashBuild(build_rows[i], &hash)) continue;
+      if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
       BufferedTupleStream::RowIdx dummy_row_idx;
       EXPECT_TRUE(hash_table->stores_tuples_);
-      bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, 
build_rows[i], hash);
+      bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, 
build_rows[i]);
       EXPECT_TRUE(inserted);
     }
     EXPECT_EQ(hash_table->num_buckets() - hash_table->EmptyBuckets(), 1);
     hash_table->Close();
+    ht_ctx->Close();
   }
 
   // This test inserts the build rows [0->5) to hash table. It validates that 
they
@@ -269,51 +269,52 @@ class HashTableTest : public testing::Test {
     // Create the hash table and insert the build rows
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, initial_num_buckets, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
-
-    uint32_t hash = 0;
-    bool success = hash_table->CheckAndResize(5, &ht_ctx);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, 
&ht_ctx);
+    EXPECT_OK(status);
+    bool success = hash_table->CheckAndResize(5, ht_ctx.get());
     ASSERT_TRUE(success);
     for (int i = 0; i < 5; ++i) {
-      if (!ht_ctx.EvalAndHashBuild(build_rows[i], &hash)) continue;
+      if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
       BufferedTupleStream::RowIdx dummy_row_idx;
       EXPECT_TRUE(hash_table->stores_tuples_);
-      bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, 
build_rows[i], hash);
+      bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, 
build_rows[i]);
       EXPECT_TRUE(inserted);
     }
     EXPECT_EQ(hash_table->size(), 5);
 
     // Do a full table scan and validate returned pointers
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, 
build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     // Double the size of the hash table and scan again.
-    ResizeTable(hash_table.get(), 2048, &ht_ctx);
+    ResizeTable(hash_table.get(), 2048, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), 2048);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, 
build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     // Try to shrink and scan again.
-    ResizeTable(hash_table.get(), 64, &ht_ctx);
+    ResizeTable(hash_table.get(), 64, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), 64);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, 
build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     // Resize to 8, which is the smallest value to fit the number of filled 
buckets.
-    ResizeTable(hash_table.get(), 8, &ht_ctx);
+    ResizeTable(hash_table.get(), 8, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), 8);
     EXPECT_EQ(hash_table->size(), 5);
     memset(scan_rows, 0, sizeof(scan_rows));
-    FullScan(hash_table.get(), &ht_ctx, 0, 5, true, scan_rows, build_rows);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, 10, false);
+    FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, 
build_rows);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
 
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   void ScanTest(bool quadratic, int initial_size, int rows_to_insert,
@@ -322,24 +323,26 @@ class HashTableTest : public testing::Test {
     ASSERT_TRUE(CreateHashTable(quadratic, initial_size, &hash_table));
 
     int total_rows = rows_to_insert + additional_rows;
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, 
&ht_ctx);
+    EXPECT_OK(status);
 
     // Add 1 row with val 1, 2 with val 2, etc.
     vector<TupleRow*> build_rows;
     ProbeTestData* probe_rows = new ProbeTestData[total_rows];
     probe_rows[0].probe_row = CreateTupleRow(0);
-    uint32_t hash = 0;
     for (int val = 1; val <= rows_to_insert; ++val) {
-      bool success = hash_table->CheckAndResize(val, &ht_ctx);
+      bool success = hash_table->CheckAndResize(val, ht_ctx.get());
       EXPECT_TRUE(success) << " failed to resize: " << val;
       probe_rows[val].probe_row = CreateTupleRow(val);
       for (int i = 0; i < val; ++i) {
         TupleRow* row = CreateTupleRow(val);
-        if (!ht_ctx.EvalAndHashBuild(row, &hash)) continue;
+        if (!ht_ctx->EvalAndHashBuild(row)) continue;
         BufferedTupleStream::RowIdx dummy_row_idx;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
+        hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
         build_rows.push_back(row);
         probe_rows[val].expected_build_rows.push_back(row);
       }
@@ -351,22 +354,22 @@ class HashTableTest : public testing::Test {
     }
 
     // Test that all the builds were found.
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
 
     // Resize and try again.
     int target_size = BitUtil::RoundUpToPowerOfTwo(2 * total_rows);
-    ResizeTable(hash_table.get(), target_size, &ht_ctx);
+    ResizeTable(hash_table.get(), target_size, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), target_size);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
 
     target_size = BitUtil::RoundUpToPowerOfTwo(total_rows + 1);
-    ResizeTable(hash_table.get(), target_size, &ht_ctx);
+    ResizeTable(hash_table.get(), target_size, ht_ctx.get());
     EXPECT_EQ(hash_table->num_buckets(), target_size);
-    ProbeTest(hash_table.get(), &ht_ctx, probe_rows, total_rows, true);
+    ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
 
     delete [] probe_rows;
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   // This test continues adding tuples to the hash table and exercises the 
resize code
@@ -378,27 +381,29 @@ class HashTableTest : public testing::Test {
     MemTracker tracker(100 * 1024 * 1024);
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, num_to_add, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, 
&ht_ctx);
+    EXPECT_OK(status);
 
     // Inserts num_to_add + (num_to_add^2) + (num_to_add^4) + ... + 
(num_to_add^20)
     // entries. When num_to_add == 4, then the total number of inserts is 
4194300.
     int build_row_val = 0;
-    uint32_t hash = 0;
     for (int i = 0; i < 20; ++i) {
       // Currently the mem used for the bucket is not being tracked by the mem 
tracker.
       // Thus the resize is expected to be successful.
       // TODO: Keep track of the mem used for the buckets and test cases where 
we actually
       // hit OOM.
       // TODO: Insert duplicates to also hit OOM.
-      bool success = hash_table->CheckAndResize(num_to_add, &ht_ctx);
+      bool success = hash_table->CheckAndResize(num_to_add, ht_ctx.get());
       EXPECT_TRUE(success) << " failed to resize: " << num_to_add;
       for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
         TupleRow* row = CreateTupleRow(build_row_val);
-        if (!ht_ctx.EvalAndHashBuild(row, &hash)) continue;
+        if (!ht_ctx->EvalAndHashBuild(row)) continue;
         BufferedTupleStream::RowIdx dummy_row_idx;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
+        bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
         if (!inserted) goto done_inserting;
       }
       expected_size += num_to_add;
@@ -410,8 +415,8 @@ class HashTableTest : public testing::Test {
     // Validate that we can find the entries before we went over the limit
     for (int i = 0; i < expected_size * 5; i += 100000) {
       TupleRow* probe_row = CreateTupleRow(i);
-      if (!ht_ctx.EvalAndHashProbe(probe_row, &hash)) continue;
-      HashTable::Iterator iter = hash_table->FindProbeRow(&ht_ctx, hash);
+      if (!ht_ctx->EvalAndHashProbe(probe_row)) continue;
+      HashTable::Iterator iter = hash_table->FindProbeRow(ht_ctx.get());
       if (i < hash_table->size()) {
         EXPECT_TRUE(!iter.AtEnd()) << " i: " << i;
         ValidateMatch(probe_row, iter.GetRow());
@@ -420,7 +425,7 @@ class HashTableTest : public testing::Test {
       }
     }
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   // This test inserts and probes as many elements as the size of the hash 
table without
@@ -430,9 +435,12 @@ class HashTableTest : public testing::Test {
   void InsertFullTest(bool quadratic, int table_size) {
     scoped_ptr<HashTable> hash_table;
     ASSERT_TRUE(CreateHashTable(quadratic, table_size, &hash_table));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
     EXPECT_EQ(hash_table->EmptyBuckets(), table_size);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, 
&ht_ctx);
+    EXPECT_OK(status);
 
     // Insert and probe table_size different tuples. All of them are expected 
to be
     // successfully inserted and probed.
@@ -441,30 +449,32 @@ class HashTableTest : public testing::Test {
     bool found;
     for (int build_row_val = 0; build_row_val < table_size; ++build_row_val) {
       TupleRow* row = CreateTupleRow(build_row_val);
-      bool passes = ht_ctx.EvalAndHashBuild(row, &hash);
+      bool passes = ht_ctx->EvalAndHashBuild(row);
+      hash = ht_ctx->expr_values_cache()->ExprValuesHash();
       EXPECT_TRUE(passes);
 
       // Insert using both Insert() and FindBucket() methods.
       if (build_row_val % 2 == 0) {
         BufferedTupleStream::RowIdx dummy_row_idx;
         EXPECT_TRUE(hash_table->stores_tuples_);
-        bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
+        bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
         EXPECT_TRUE(inserted);
       } else {
-        iter = hash_table->FindBuildRowBucket(&ht_ctx, hash, &found);
+        iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
         EXPECT_FALSE(iter.AtEnd());
         EXPECT_FALSE(found);
         iter.SetTuple(row->GetTuple(0), hash);
       }
       EXPECT_EQ(hash_table->EmptyBuckets(), table_size - build_row_val - 1);
 
-      passes = ht_ctx.EvalAndHashProbe(row, &hash);
+      passes = ht_ctx->EvalAndHashProbe(row);
+      hash = ht_ctx->expr_values_cache()->ExprValuesHash();
       EXPECT_TRUE(passes);
-      iter = hash_table->FindProbeRow(&ht_ctx, hash);
+      iter = hash_table->FindProbeRow(ht_ctx.get());
       EXPECT_FALSE(iter.AtEnd());
       EXPECT_EQ(row->GetTuple(0), iter.GetTuple());
 
-      iter = hash_table->FindBuildRowBucket(&ht_ctx, hash, &found);
+      iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
       EXPECT_FALSE(iter.AtEnd());
       EXPECT_TRUE(found);
       EXPECT_EQ(row->GetTuple(0), iter.GetTuple());
@@ -474,18 +484,18 @@ class HashTableTest : public testing::Test {
     // hash table code path.
     EXPECT_EQ(hash_table->EmptyBuckets(), 0);
     TupleRow* probe_row = CreateTupleRow(table_size);
-    bool passes = ht_ctx.EvalAndHashProbe(probe_row, &hash);
+    bool passes = ht_ctx->EvalAndHashProbe(probe_row);
     EXPECT_TRUE(passes);
-    iter = hash_table->FindProbeRow(&ht_ctx, hash);
+    iter = hash_table->FindProbeRow(ht_ctx.get());
     EXPECT_TRUE(iter.AtEnd());
 
     // Since hash_table is full, FindBucket cannot find an empty bucket, so 
returns End().
-    iter = hash_table->FindBuildRowBucket(&ht_ctx, hash, &found);
+    iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
     EXPECT_TRUE(iter.AtEnd());
     EXPECT_FALSE(found);
 
     hash_table->Close();
-    ht_ctx.Close();
+    ht_ctx->Close();
   }
 
   // This test makes sure we can tolerate the low memory case where we do not 
have enough
@@ -498,12 +508,15 @@ class HashTableTest : public testing::Test {
     scoped_ptr<HashTable> hash_table;
     ASSERT_FALSE(CreateHashTable(quadratic, table_size, &hash_table, 
block_size,
           max_num_blocks, reserved_blocks));
-    HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-        std::vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1);
-    HashTable::Iterator iter = hash_table->Begin(&ht_ctx);
+    scoped_ptr<HashTableCtx> ht_ctx;
+    Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+        probe_expr_ctxs_, false /* !stores_nulls_ */,
+        vector<bool>(build_expr_ctxs_.size(), false), 1, 0, 1, &tracker_, 
&ht_ctx);
+    EXPECT_OK(status);
+    HashTable::Iterator iter = hash_table->Begin(ht_ctx.get());
     EXPECT_TRUE(iter.AtEnd());
-
     hash_table->Close();
+    ht_ctx->Close();
   }
 };
 
@@ -582,17 +595,23 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) {
 
 // Test that hashing empty string updates hash value.
 TEST_F(HashTableTest, HashEmpty) {
-  HashTableCtx ht_ctx(build_expr_ctxs_, probe_expr_ctxs_, false,
-      std::vector<bool>(build_expr_ctxs_.size(), false), 1, 2, 1);
+  EXPECT_TRUE(test_env_->CreateQueryState(0, 100, 8 * 1024 * 1024,
+      &runtime_state_).ok());
+  scoped_ptr<HashTableCtx> ht_ctx;
+  Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
+      probe_expr_ctxs_, false /* !stores_nulls_ */,
+      vector<bool>(build_expr_ctxs_.size(), false), 1, 2, 1, &tracker_, 
&ht_ctx);
+  EXPECT_OK(status);
+
   uint32_t seed = 9999;
-  ht_ctx.set_level(0);
-  EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, seed));
+  ht_ctx->set_level(0);
+  EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, seed));
   // TODO: level 0 uses CRC hash, which only swaps bytes around on empty input.
-  // EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, ht_ctx.Hash(NULL, 0, seed)));
-  ht_ctx.set_level(1);
-  EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, seed));
-  EXPECT_NE(seed, ht_ctx.Hash(NULL, 0, ht_ctx.Hash(NULL, 0, seed)));
-  ht_ctx.Close();
+  // EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, ht_ctx->Hash(NULL, 0, seed)));
+  ht_ctx->set_level(1);
+  EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, seed));
+  EXPECT_NE(seed, ht_ctx->Hash(NULL, 0, ht_ctx->Hash(NULL, 0, seed)));
+  ht_ctx.get()->Close();
 }
 
 TEST_F(HashTableTest, VeryLowMemTest) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 9cab765..953ddce 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -84,8 +84,8 @@ static const int NUM_SMALL_DATA_PAGES = 
sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof
 
 HashTableCtx::HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
     const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
-    const std::vector<bool>& finds_nulls, int32_t initial_seed,
-    int max_levels, int num_build_tuples)
+    const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
+    MemTracker* tracker)
     : build_expr_ctxs_(build_expr_ctxs),
       probe_expr_ctxs_(probe_expr_ctxs),
       stores_nulls_(stores_nulls),
@@ -93,17 +93,13 @@ HashTableCtx::HashTableCtx(const std::vector<ExprContext*>& 
build_expr_ctxs,
       finds_some_nulls_(std::accumulate(
           finds_nulls_.begin(), finds_nulls_.end(), false, 
std::logical_or<bool>())),
       level_(0),
-      scratch_row_(reinterpret_cast<TupleRow*>(malloc(sizeof(Tuple*) * 
num_build_tuples))) {
+      scratch_row_(NULL),
+      tracker_(tracker) {
   DCHECK(!finds_some_nulls_ || stores_nulls_);
   // Compute the layout and buffer size to store the evaluated expr results
   DCHECK_EQ(build_expr_ctxs_.size(), probe_expr_ctxs_.size());
   DCHECK_EQ(build_expr_ctxs_.size(), finds_nulls_.size());
   DCHECK(!build_expr_ctxs_.empty());
-  results_buffer_size_ = Expr::ComputeResultsLayout(build_expr_ctxs_,
-      &expr_values_buffer_offsets_, &var_result_begin_);
-  expr_values_buffer_ = new uint8_t[results_buffer_size_];
-  memset(expr_values_buffer_, 0, sizeof(uint8_t) * results_buffer_size_);
-  expr_value_null_bits_ = new uint8_t[build_expr_ctxs.size()];
 
   // Populate the seeds to use for all the levels. TODO: revisit how we 
generate these.
   DCHECK_GE(max_levels, 0);
@@ -116,32 +112,65 @@ HashTableCtx::HashTableCtx(const 
std::vector<ExprContext*>& build_expr_ctxs,
   }
 }
 
+Status HashTableCtx::Create(RuntimeState* state,
+    const std::vector<ExprContext*>& build_expr_ctxs,
+    const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
+    const std::vector<bool>& finds_nulls, int32_t initial_seed, int max_levels,
+    int num_build_tuples, MemTracker* tracker, scoped_ptr<HashTableCtx>* 
ht_ctx) {
+  ht_ctx->reset(new HashTableCtx(build_expr_ctxs, probe_expr_ctxs, 
stores_nulls,
+      finds_nulls, initial_seed, max_levels, tracker));
+  return ht_ctx->get()->Init(state, num_build_tuples);
+}
+
+Status HashTableCtx::Init(RuntimeState* state, int num_build_tuples) {
+  int scratch_row_size = sizeof(Tuple*) * num_build_tuples;
+  scratch_row_ = reinterpret_cast<TupleRow*>(malloc(scratch_row_size));
+  if (UNLIKELY(scratch_row_ == NULL)) {
+    return Status(Substitute("Failed to allocate $0 bytes for scratch row of "
+        "HashTableCtx.", scratch_row_size));
+  }
+  return expr_values_cache_.Init(state, tracker_, build_expr_ctxs_);
+}
+
 void HashTableCtx::Close() {
-  // TODO: use tr1::array?
-  DCHECK(expr_values_buffer_ != NULL);
-  delete[] expr_values_buffer_;
-  expr_values_buffer_ = NULL;
-  DCHECK(expr_value_null_bits_ != NULL);
-  delete[] expr_value_null_bits_;
-  expr_value_null_bits_ = NULL;
   free(scratch_row_);
   scratch_row_ = NULL;
+  expr_values_cache_.Close(tracker_);
+}
+
+uint32_t HashTableCtx::Hash(const void* input, int len, uint32_t hash) const {
+  /// Use CRC hash at first level for better performance. Switch to murmur 
hash at
+  /// subsequent levels since CRC doesn't randomize well with different seed 
inputs.
+  if (level_ == 0) return HashUtil::Hash(input, len, hash);
+  return HashUtil::MurmurHash2_64(input, len, hash);
+}
+
+uint32_t HashTableCtx::HashCurrentRow() const {
+  DCHECK_LT(level_, seeds_.size());
+  if (expr_values_cache_.var_result_offset() == -1) {
+    /// This handles NULLs implicitly since a constant seed value was put
+    /// into results buffer for nulls.
+    return Hash(expr_values_cache_.cur_expr_values_,
+        expr_values_cache_.expr_values_bytes_per_row(), seeds_[level_]);
+  } else {
+    return HashTableCtx::HashVariableLenRow();
+  }
 }
 
 bool HashTableCtx::EvalRow(TupleRow* row, const vector<ExprContext*>& ctxs) {
   bool has_null = false;
+  uint8_t* exprs_nullness = expr_values_cache_.ExprValueNullPtr(0);
   for (int i = 0; i < ctxs.size(); ++i) {
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
+    void* loc = expr_values_cache_.ExprValuePtr(i);
     void* val = ctxs[i]->GetValue(row);
     if (val == NULL) {
       // If the table doesn't store nulls, no reason to keep evaluating
       if (!stores_nulls_) return true;
-
-      expr_value_null_bits_[i] = true;
+      exprs_nullness[i] = true;
       val = reinterpret_cast<void*>(&NULL_VALUE);
       has_null = true;
     } else {
-      expr_value_null_bits_[i] = false;
+      exprs_nullness[i] = false;
     }
     DCHECK_LE(build_expr_ctxs_[i]->root()->type().GetSlotSize(),
         sizeof(NULL_VALUE));
@@ -152,18 +181,20 @@ bool HashTableCtx::EvalRow(TupleRow* row, const 
vector<ExprContext*>& ctxs) {
 
 uint32_t HashTableCtx::HashVariableLenRow() const {
   uint32_t hash = seeds_[level_];
+  int var_result_offset = expr_values_cache_.var_result_offset();
   // Hash the non-var length portions (if there are any)
-  if (var_result_begin_ != 0) {
-    hash = Hash(expr_values_buffer_, var_result_begin_, hash);
+  if (var_result_offset != 0) {
+    hash = Hash(expr_values_cache_.cur_expr_values_, var_result_offset, hash);
   }
 
+  uint8_t* exprs_nullness = expr_values_cache_.ExprValueNullPtr(0);
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
-    // non-string and null slots are already part of expr_values_buffer
+    // non-string and null slots are already part of cur_expr_values_
     if (build_expr_ctxs_[i]->root()->type().type != TYPE_STRING &&
         build_expr_ctxs_[i]->root()->type().type != TYPE_VARCHAR) continue;
 
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    if (expr_value_null_bits_[i]) {
+    void* loc = expr_values_cache_.ExprValuePtr(i);
+    if (exprs_nullness[i]) {
       // Hash the null random seed values at 'loc'
       hash = Hash(loc, sizeof(StringValue), hash);
     } else {
@@ -178,17 +209,18 @@ uint32_t HashTableCtx::HashVariableLenRow() const {
 
 template<bool FORCE_NULL_EQUALITY>
 bool HashTableCtx::Equals(TupleRow* build_row) const {
+  uint8_t* exprs_nullness = expr_values_cache_.ExprValueNullPtr(0);
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     void* val = build_expr_ctxs_[i]->GetValue(build_row);
     if (val == NULL) {
       if (!(FORCE_NULL_EQUALITY || finds_nulls_[i])) return false;
-      if (!expr_value_null_bits_[i]) return false;
+      if (!exprs_nullness[i]) return false;
       continue;
     } else {
-      if (expr_value_null_bits_[i]) return false;
+      if (exprs_nullness[i]) return false;
     }
 
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
+    void* loc = expr_values_cache_.ExprValuePtr(i);
     if (!RawValue::Eq(loc, val, build_expr_ctxs_[i]->root()->type())) {
       return false;
     }
@@ -199,6 +231,113 @@ bool HashTableCtx::Equals(TupleRow* build_row) const {
 template bool HashTableCtx::Equals<true>(TupleRow* build_row) const;
 template bool HashTableCtx::Equals<false>(TupleRow* build_row) const;
 
+HashTableCtx::ExprValuesCache::ExprValuesCache()
+    : capacity_(0),
+      cur_expr_values_(NULL),
+      cur_expr_values_null_(NULL),
+      cur_expr_values_hash_(NULL),
+      cur_expr_values_hash_end_(NULL),
+      expr_values_array_(NULL),
+      expr_values_null_array_(NULL),
+      expr_values_hash_array_(NULL),
+      null_bitmap_(0) { }
+
+Status HashTableCtx::ExprValuesCache::Init(RuntimeState* state,
+    MemTracker* tracker, const std::vector<ExprContext*>& build_expr_ctxs) {
+  // Initialize the number of expressions.
+  num_exprs_ = build_expr_ctxs.size();
+  // Compute the layout of evaluated values of a row.
+  expr_values_bytes_per_row_ = Expr::ComputeResultsLayout(build_expr_ctxs,
+      &expr_values_offsets_, &var_result_offset_);
+  if (expr_values_bytes_per_row_ == 0) {
+    DCHECK_EQ(num_exprs_, 0);
+    return Status::OK();
+  }
+  DCHECK_GT(expr_values_bytes_per_row_, 0);
+  // Compute the maximum number of cached rows which can fit in the memory 
budget.
+  // TODO: Find the optimal prefetch batch size. This may be something
+  // processor dependent so we may need calibration at Impala startup time.
+  capacity_ = std::max(1, std::min(state->batch_size(),
+      MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_));
+
+  int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
+  if (!tracker->TryConsume(mem_usage)) {
+    capacity_ = 0;
+    string details = Substitute("HashTableCtx::ExprValuesCache failed to 
allocate $0 bytes.",
+        mem_usage);
+    return tracker->MemLimitExceeded(state, details, mem_usage);
+  }
+
+  int expr_values_size = expr_values_bytes_per_row_ * capacity_;
+  expr_values_array_.reset(new uint8_t[expr_values_size]);
+  cur_expr_values_ = expr_values_array_.get();
+  memset(cur_expr_values_, 0, expr_values_size);
+
+  int expr_values_null_size = num_exprs_ * capacity_;
+  expr_values_null_array_.reset(new uint8_t[expr_values_null_size]);
+  cur_expr_values_null_ = expr_values_null_array_.get();
+  memset(cur_expr_values_null_, 0, expr_values_null_size);
+
+  expr_values_hash_array_.reset(new uint32_t[capacity_]);
+  cur_expr_values_hash_ = expr_values_hash_array_.get();
+  cur_expr_values_hash_end_ = cur_expr_values_hash_;
+  memset(cur_expr_values_hash_, 0, sizeof(uint32) * capacity_);
+
+  null_bitmap_.Reset(capacity_);
+  return Status::OK();
+}
+
+void HashTableCtx::ExprValuesCache::Close(MemTracker* tracker) {
+  if (capacity_ == 0) return;
+  cur_expr_values_ = NULL;
+  cur_expr_values_null_ = NULL;
+  cur_expr_values_hash_ = NULL;
+  cur_expr_values_hash_end_ = NULL;
+  expr_values_array_.reset();
+  expr_values_null_array_.reset();
+  expr_values_hash_array_.reset();
+  null_bitmap_.Reset(0);
+  int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
+  tracker->Release(mem_usage);
+}
+
+int HashTableCtx::ExprValuesCache::MemUsage(int capacity,
+    int expr_values_bytes_per_row, int num_exprs) {
+  return expr_values_bytes_per_row * capacity + // expr_values_array_
+      num_exprs * capacity +                    // expr_values_null_array_
+      sizeof(uint32) * capacity +               // expr_values_hash_array_
+      Bitmap::MemUsage(capacity);               // null_bitmap_
+}
+
+uint8_t* HashTableCtx::ExprValuesCache::ExprValuePtr(int expr_idx) const {
+  return cur_expr_values_ + expr_values_offsets_[expr_idx];
+}
+
+uint8_t* HashTableCtx::ExprValuesCache::ExprValueNullPtr(int expr_idx) const {
+  return cur_expr_values_null_ + expr_idx;
+}
+
+void HashTableCtx::ExprValuesCache::ResetIterators() {
+  cur_expr_values_ = expr_values_array_.get();
+  cur_expr_values_null_ = expr_values_null_array_.get();
+  cur_expr_values_hash_ = expr_values_hash_array_.get();
+}
+
+void HashTableCtx::ExprValuesCache::Reset() {
+  ResetIterators();
+  // Set the end pointer after resetting the other pointers so they point to
+  // the same location.
+  cur_expr_values_hash_end_ = cur_expr_values_hash_;
+  null_bitmap_.SetAllBits(false);
+}
+
+void HashTableCtx::ExprValuesCache::ResetForRead() {
+  // Record the end of hash values iterator to be used in AtEnd().
+  // Do it before resetting the pointers.
+  cur_expr_values_hash_end_ = cur_expr_values_hash_;
+  ResetIterators();
+}
+
 const double HashTable::MAX_FILL_FACTOR = 0.75f;
 
 HashTable* HashTable::Create(RuntimeState* state,
@@ -306,7 +445,7 @@ bool HashTable::ResizeBuckets(int64_t num_buckets, const 
HashTableCtx* ht_ctx) {
     Bucket* bucket_to_copy = &buckets_[iter.bucket_idx_];
     bool found = false;
     int64_t bucket_idx =
-        Probe<true>(new_buckets, num_buckets, false, NULL, NULL, 
bucket_to_copy->hash, &found);
+        Probe<true>(new_buckets, num_buckets, NULL, bucket_to_copy->hash, 
&found);
     DCHECK(!found);
     DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND) << " Probe failed even 
though "
         " there are free buckets. " << num_buckets << " " << 
num_filled_buckets_;
@@ -458,30 +597,71 @@ static void CodegenAssignNullValue(LlvmCodeGen* codegen,
 }
 
 // Codegen for evaluating a tuple row over either build_expr_ctxs_ or 
probe_expr_ctxs_.
-// For the case where we are joining on a single int, the IR looks like
-// define i1 @EvalBuildRow(%"class.impala::HashTableCtx"* %this_ptr,
-//                         %"class.impala::TupleRow"* %row) #20 {
+// For a group by with (big int, string) the IR looks like
+// define i1 @EvalProbeRow(%"class.impala::HashTableCtx"* %this_ptr,
+//                         %"class.impala::TupleRow"* %row) #33 {
 // entry:
-//   %result = call i64 @GetSlotRef1(%"class.impala::ExprContext"* inttoptr
-//                                     (i64 67971664 to 
%"class.impala::ExprContext"*),
-//                                   %"class.impala::TupleRow"* %row)
+//   %0 = load i8*, i8** inttoptr (i64 230325056 to i8**)
+//   %1 = load i8*, i8** inttoptr (i64 230325064 to i8**)
+//   %loc_addr = getelementptr i8, i8* %0, i32 0
+//   %loc = bitcast i8* %loc_addr to i32*
+//   %result = call i64 @GetSlotRef.3(%"class.impala::ExprContext"*
+//             inttoptr (i64 158123712 to %"class.impala::ExprContext"*),
+//             %"class.impala::TupleRow"* %row)
 //   %is_null = trunc i64 %result to i1
-//   %0 = zext i1 %is_null to i8
-//   store i8 %0, i8* inttoptr (i64 95753144 to i8*)
+//   %2 = zext i1 %is_null to i8
+//   %null_byte_loc = getelementptr i8, i8* %1, i32 0
+//   store i8 %2, i8* %null_byte_loc
 //   br i1 %is_null, label %null, label %not_null
 //
 // null:                                             ; preds = %entry
-//   store i32 -2128831035, i32* inttoptr (i64 95753128 to i32*)
+//   store i32 -2128831035, i32* %loc
 //   br label %continue
 //
 // not_null:                                         ; preds = %entry
-//   %1 = ashr i64 %result, 32
-//   %2 = trunc i64 %1 to i32
-//   store i32 %2, i32* inttoptr (i64 95753128 to i32*)
+//   %3 = ashr i64 %result, 32
+//   %4 = trunc i64 %3 to i32
+//   store i32 %4, i32* %loc
 //   br label %continue
 //
 // continue:                                         ; preds = %not_null, %null
-//   ret i1 true
+//   %is_null_phi = phi i1 [ true, %null ], [ false, %not_null ]
+//   %has_null = or i1 false, %is_null_phi
+//   %loc_addr1 = getelementptr i8, i8* %0, i32 8
+//   %loc2 = bitcast i8* %loc_addr1 to %"struct.impala::StringValue"*
+//   %result6 = call { i64, i8* } @GetSlotRef.4(%"class.impala::ExprContext"*
+//              inttoptr (i64 158123904 to %"class.impala::ExprContext"*),
+//              %"class.impala::TupleRow"* %row)
+//   %5 = extractvalue { i64, i8* } %result6, 0
+//   %is_null7 = trunc i64 %5 to i1
+//   %6 = zext i1 %is_null7 to i8
+//   %null_byte_loc8 = getelementptr i8, i8* %1, i32 1
+//   store i8 %6, i8* %null_byte_loc8
+//   br i1 %is_null7, label %null3, label %not_null4
+//
+// null3:                                            ; preds = %continue
+//   %string_ptr = getelementptr inbounds %"struct.impala::StringValue",
+//                 %"struct.impala::StringValue"* %loc2, i32 0, i32 0
+//   %string_len = getelementptr inbounds %"struct.impala::StringValue",
+//                 %"struct.impala::StringValue"* %loc2, i32 0, i32 1
+//   store i8* inttoptr (i32 -2128831035 to i8*), i8** %string_ptr
+//   store i32 -2128831035, i32* %string_len
+//   br label %continue5
+//
+// not_null4:                                        ; preds = %continue
+//   %result9 = extractvalue { i64, i8* } %result6, 1
+//   %7 = insertvalue %"struct.impala::StringValue" zeroinitializer, i8* 
%result9, 0
+//   %8 = extractvalue { i64, i8* } %result6, 0
+//   %9 = ashr i64 %8, 32
+//   %10 = trunc i64 %9 to i32
+//   %11 = insertvalue %"struct.impala::StringValue" %7, i32 %10, 1
+//   store %"struct.impala::StringValue" %11, %"struct.impala::StringValue"* 
%loc2
+//   br label %continue5
+//
+// continue5:                                        ; preds = %not_null4, 
%null3
+//   %is_null_phi10 = phi i1 [ true, %null3 ], [ false, %not_null4 ]
+//   %has_null11 = or i1 %has_null, %is_null_phi10
+//   ret i1 %has_null11
 // }
 // For each expr, we create 3 code blocks.  The null, not null and continue 
blocks.
 // Both the null and not null branch into the continue block.  The continue 
block
@@ -509,7 +689,7 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, 
bool build, Function**
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
   PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
+  PointerType* buffer_ptr_type = PointerType::get(codegen->ptr_type(), 0);
   LlvmCodeGen::FnPrototype prototype(codegen, build ? "EvalBuildRow" : 
"EvalProbeRow",
       codegen->GetType(TYPE_BOOLEAN));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
@@ -519,18 +699,29 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, 
bool build, Function**
   LlvmCodeGen::LlvmBuilder builder(context);
   Value* args[2];
   *fn = prototype.GeneratePrototype(&builder, args);
-
   Value* row = args[1];
   Value* has_null = codegen->false_value();
 
+  // Load cur_expr_values_ into a LLVM pointer.
+  Value* cur_expr_values_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_);
+  Value* cur_expr_values = builder.CreateLoad(cur_expr_values_ptr);
+
+  // Load cur_expr_values_null_ into a LLVM pointer.
+  Value* cur_expr_values_null_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_null_);
+  Value* cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+
   for (int i = 0; i < ctxs.size(); ++i) {
     // TODO: refactor this to somewhere else?  This is not hash table specific 
except for
     // the null handling bit and would be used for anyone that needs to 
materialize a
     // vector of exprs
     // Convert result buffer to llvm ptr type
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    Value* llvm_loc = codegen->CastPtrToLlvmPtr(
-        codegen->GetPtrType(ctxs[i]->root()->type()), loc);
+    int offset = expr_values_cache_.expr_values_offsets(i);
+    Value* loc = builder.CreateGEP(NULL, cur_expr_values,
+        codegen->GetIntConstant(TYPE_INT, offset), "loc_addr");
+    Value* llvm_loc = builder.CreatePointerCast(loc,
+        codegen->GetPtrType(ctxs[i]->root()->type()), "loc");
 
     BasicBlock* null_block = BasicBlock::Create(context, "null", *fn);
     BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", *fn);
@@ -555,11 +746,9 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, 
bool build, Function**
 
     // Set null-byte result
     Value* null_byte = builder.CreateZExt(is_null, 
codegen->GetType(TYPE_TINYINT));
-    uint8_t* null_byte_loc = &expr_value_null_bits_[i];
-    Value* llvm_null_byte_loc =
-        codegen->CastPtrToLlvmPtr(codegen->ptr_type(), null_byte_loc);
+    Value* llvm_null_byte_loc = builder.CreateGEP(NULL, cur_expr_values_null,
+        codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
     builder.CreateStore(null_byte, llvm_null_byte_loc);
-
     builder.CreateCondBr(is_null, null_block, not_null_block);
 
     // Null block
@@ -599,30 +788,37 @@ Status HashTableCtx::CodegenEvalRow(RuntimeState* state, 
bool build, Function**
 
 // Codegen for hashing the current row.  In the case with both string and 
non-string data
 // (group by int_col, string_col), the IR looks like:
-// define i32 @HashCurrentRow(%"class.impala::HashTableCtx"* %this_ptr) #20 {
+// define i32 @HashCurrentRow(%"class.impala::HashTableCtx"* %this_ptr) #33 {
 // entry:
-//   %seed = call i32 @GetHashSeed(%"class.impala::HashTableCtx"* %this_ptr)
-//   %0 = call i32 @CrcHash16(i8* inttoptr (i64 119151296 to i8*), i32 16, i32 
%seed)
-//   %1 = load i8* inttoptr (i64 119943721 to i8*)
-//   %2 = icmp ne i8 %1, 0
-//   br i1 %2, label %null, label %not_null
+//   %0 = load i8*, i8** inttoptr (i64 230325056 to i8**)
+//   %1 = load i8*, i8** inttoptr (i64 230325064 to i8**)
+//   %seed = call i32 @_ZNK6impala12HashTableCtx11GetHashSeedEv(
+//           %"class.impala::HashTableCtx"* %this_ptr)
+//   %hash = call i32 @CrcHash8(i8* %0, i32 8, i32 %seed)
+//   %loc_addr = getelementptr i8, i8* %0, i32 8
+//   %null_byte_loc = getelementptr i8, i8* %1, i32 1
+//   %null_byte = load i8, i8* %null_byte_loc
+//   %is_null = icmp ne i8 %null_byte, 0
+//   br i1 %is_null, label %null, label %not_null
 //
 // null:                                             ; preds = %entry
-//   %3 = call i32 @CrcHash161(i8* inttoptr (i64 119151312 to i8*), i32 16, 
i32 %0)
+//   %str_null = call i32 @CrcHash16(i8* %loc_addr, i32 16, i32 %hash)
 //   br label %continue
 //
 // not_null:                                         ; preds = %entry
-//   %4 = load i8** getelementptr inbounds (%"struct.impala::StringValue"* 
inttoptr
-//       (i64 119151312 to %"struct.impala::StringValue"*), i32 0, i32 0)
-//   %5 = load i32* getelementptr inbounds (%"struct.impala::StringValue"* 
inttoptr
-//       (i64 119151312 to %"struct.impala::StringValue"*), i32 0, i32 1)
-//   %6 = call i32 @IrCrcHash(i8* %4, i32 %5, i32 %0)
+//   %str_val = bitcast i8* %loc_addr to %"struct.impala::StringValue"*
+//   %2 = getelementptr inbounds %"struct.impala::StringValue",
+//        %"struct.impala::StringValue"* %str_val, i32 0, i32 0
+//   %3 = getelementptr inbounds %"struct.impala::StringValue",
+//        %"struct.impala::StringValue"* %str_val, i32 0, i32 1
+//   %ptr = load i8*, i8** %2
+//   %len = load i32, i32* %3
+//   %string_hash = call i32 @IrCrcHash(i8* %ptr, i32 %len, i32 %hash)
 //   br label %continue
 //
 // continue:                                         ; preds = %not_null, %null
-//   %7 = phi i32 [ %6, %not_null ], [ %3, %null ]
-//   call void @set_hash(%"class.impala::HashTableCtx"* %this_ptr, i32 %7)
-//   ret i32 %7
+//   %hash_phi = phi i32 [ %string_hash, %not_null ], [ %str_null, %null ]
+//   ret i32 %hash_phi
 // }
 Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* state, bool 
use_murmur,
     Function** fn) {
@@ -640,6 +836,7 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* 
state, bool use_murmur,
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
   PointerType* this_ptr_type = PointerType::get(this_type, 0);
+  PointerType* buffer_ptr_type = PointerType::get(codegen->ptr_type(), 0);
 
   LlvmCodeGen::FnPrototype prototype(codegen,
       (use_murmur ? "MurmurHashCurrentRow" : "HashCurrentRow"),
@@ -651,6 +848,16 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* 
state, bool use_murmur,
   Value* this_arg;
   *fn = prototype.GeneratePrototype(&builder, &this_arg);
 
+  // Load cur_expr_values_ into a LLVM pointer.
+  Value* cur_expr_values_ptr =
+      codegen->CastPtrToLlvmPtr(buffer_ptr_type, 
&expr_values_cache_.cur_expr_values_);
+  Value* cur_expr_values = builder.CreateLoad(cur_expr_values_ptr);
+
+  // Load cur_expr_values_null_ into a LLVM pointer.
+  Value* cur_expr_values_null_ptr =
+      codegen->CastPtrToLlvmPtr(buffer_ptr_type, 
&expr_values_cache_.cur_expr_values_null_);
+  Value* cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+
   // Call GetHashSeed() to get seeds_[level_]
   Function* get_hash_seed_fn =
       codegen->GetFunction(IRFunction::HASH_TABLE_GET_HASH_SEED, false);
@@ -658,25 +865,26 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* 
state, bool use_murmur,
       "seed");
 
   Value* hash_result = seed;
-  Value* data = codegen->CastPtrToLlvmPtr(codegen->ptr_type(), 
expr_values_buffer_);
-  if (var_result_begin_ == -1) {
-    // No variable length slots, just hash what is in 'expr_values_buffer_'
-    if (results_buffer_size_ > 0) {
+  const int var_result_offset = expr_values_cache_.var_result_offset();
+  const int expr_values_bytes_per_row = 
expr_values_cache_.expr_values_bytes_per_row();
+  if (var_result_offset == -1) {
+    // No variable length slots, just hash what is in 'expr_expr_values_cache_'
+    if (expr_values_bytes_per_row > 0) {
       Function* hash_fn = use_murmur ?
-                          codegen->GetMurmurHashFunction(results_buffer_size_) 
:
-                          codegen->GetHashFunction(results_buffer_size_);
-      Value* len = codegen->GetIntConstant(TYPE_INT, results_buffer_size_);
+                          
codegen->GetMurmurHashFunction(expr_values_bytes_per_row) :
+                          codegen->GetHashFunction(expr_values_bytes_per_row);
+      Value* len = codegen->GetIntConstant(TYPE_INT, 
expr_values_bytes_per_row);
       hash_result = builder.CreateCall(hash_fn,
-          ArrayRef<Value*>({data, len, hash_result}), "hash");
+          ArrayRef<Value*>({cur_expr_values, len, hash_result}), "hash");
     }
   } else {
-    if (var_result_begin_ > 0) {
+    if (var_result_offset > 0) {
       Function* hash_fn = use_murmur ?
-                          codegen->GetMurmurHashFunction(var_result_begin_) :
-                          codegen->GetHashFunction(var_result_begin_);
-      Value* len = codegen->GetIntConstant(TYPE_INT, var_result_begin_);
+                          codegen->GetMurmurHashFunction(var_result_offset) :
+                          codegen->GetHashFunction(var_result_offset);
+      Value* len = codegen->GetIntConstant(TYPE_INT, var_result_offset);
       hash_result = builder.CreateCall(hash_fn,
-          ArrayRef<Value*>({data, len, hash_result}), "hash");
+          ArrayRef<Value*>({cur_expr_values, len, hash_result}), "hash");
     }
 
     // Hash string slots
@@ -689,7 +897,9 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* 
state, bool use_murmur,
       BasicBlock* continue_block = NULL;
       Value* str_null_result = NULL;
 
-      void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
+      int offset = expr_values_cache_.expr_values_offsets(i);
+      Value* llvm_loc = builder.CreateGEP(NULL, cur_expr_values,
+          codegen->GetIntConstant(TYPE_INT, offset), "loc_addr");
 
       // If the hash table stores nulls, we need to check if the stringval
       // evaluated to NULL
@@ -698,9 +908,8 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* 
state, bool use_murmur,
         not_null_block = BasicBlock::Create(context, "not_null", *fn);
         continue_block = BasicBlock::Create(context, "continue", *fn);
 
-        uint8_t* null_byte_loc = &expr_value_null_bits_[i];
-        Value* llvm_null_byte_loc =
-            codegen->CastPtrToLlvmPtr(codegen->ptr_type(), null_byte_loc);
+        Value* llvm_null_byte_loc = builder.CreateGEP(NULL, 
cur_expr_values_null,
+            codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
         Value* null_byte = builder.CreateLoad(llvm_null_byte_loc, "null_byte");
         Value* is_null = builder.CreateICmpNE(null_byte,
             codegen->GetIntConstant(TYPE_TINYINT, 0), "is_null");
@@ -712,7 +921,6 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* 
state, bool use_murmur,
         Function* null_hash_fn = use_murmur ?
                                  
codegen->GetMurmurHashFunction(sizeof(StringValue)) :
                                  codegen->GetHashFunction(sizeof(StringValue));
-        Value* llvm_loc = codegen->CastPtrToLlvmPtr(codegen->ptr_type(), loc);
         Value* len = codegen->GetIntConstant(TYPE_INT, sizeof(StringValue));
         str_null_result = builder.CreateCall(null_hash_fn,
             ArrayRef<Value*>({llvm_loc, len, hash_result}), "str_null");
@@ -722,7 +930,8 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* 
state, bool use_murmur,
       }
 
       // Convert expr_values_buffer_ loc to llvm value
-      Value* str_val = 
codegen->CastPtrToLlvmPtr(codegen->GetPtrType(TYPE_STRING), loc);
+      Value* str_val = builder.CreatePointerCast(llvm_loc,
+          codegen->GetPtrType(TYPE_STRING), "str_val");
 
       Value* ptr = builder.CreateStructGEP(NULL, str_val, 0);
       Value* len = builder.CreateStructGEP(NULL, str_val, 1);
@@ -759,55 +968,71 @@ Status HashTableCtx::CodegenHashCurrentRow(RuntimeState* 
state, bool use_murmur,
   return Status::OK();
 }
 
-// Codegen for HashTableCtx::Equals.  For a hash table with two exprs 
(string,int),
+// Codegen for HashTableCtx::Equals.  For a group by with (bigint, string),
 // the IR looks like:
 //
 // define i1 @Equals(%"class.impala::HashTableCtx"* %this_ptr,
-//                   %"class.impala::TupleRow"* %row) {
+//                   %"class.impala::TupleRow"* %row) #33 {
 // entry:
+//   %0 = alloca { i64, i8* }
+//   %1 = load i8*, i8** inttoptr (i64 230325056 to i8**)
+//   %2 = load i8*, i8** inttoptr (i64 230325064 to i8**)
 //   %result = call i64 @GetSlotRef(%"class.impala::ExprContext"* inttoptr
-//                                  (i64 146381856 to 
%"class.impala::ExprContext"*),
-//                                  %"class.impala::TupleRow"* %row)
-//   %0 = trunc i64 %result to i1
-//   br i1 %0, label %null, label %not_null
+//             (i64 165557504 to %"class.impala::ExprContext"*),
+//             %"class.impala::TupleRow"* %row)
+//   %is_null = trunc i64 %result to i1
+//   %null_byte_loc = getelementptr i8, i8* %2, i32 0
+//   %3 = load i8, i8* %null_byte_loc
+//   %4 = icmp ne i8 %3, 0
+//   %loc = getelementptr i8, i8* %1, i32 0
+//   %row_val = bitcast i8* %loc to i32*
+//   br i1 %is_null, label %null, label %not_null
 //
-// false_block:                            ; preds = %not_null2, %null1, 
%not_null, %null
+// false_block:                ; preds = %cmp9, %not_null2, %null1, %cmp, 
%not_null, %null
 //   ret i1 false
 //
 // null:                                             ; preds = %entry
-//   br i1 false, label %continue, label %false_block
+//   br i1 %4, label %continue, label %false_block
 //
 // not_null:                                         ; preds = %entry
-//   %1 = load i32* inttoptr (i64 104774368 to i32*)
-//   %2 = ashr i64 %result, 32
-//   %3 = trunc i64 %2 to i32
-//   %cmp_raw = icmp eq i32 %3, %1
-//   br i1 %cmp_raw, label %continue, label %false_block
+//   br i1 %4, label %false_block, label %cmp
 //
-// continue:                                         ; preds = %not_null, %null
-//   %result4 = call { i64, i8* } @GetSlotRef1(
-//       %"class.impala::ExprContext"* inttoptr
-//       (i64 146381696 to %"class.impala::ExprContext"*),
-//       %"class.impala::TupleRow"* %row)
-//   %4 = extractvalue { i64, i8* } %result4, 0
-//   %5 = trunc i64 %4 to i1
-//   br i1 %5, label %null1, label %not_null2
+// continue:                                         ; preds = %cmp, %null
+//   %result4 = call { i64, i8* } @GetSlotRef.2(%"class.impala::ExprContext"*
+//              inttoptr (i64 165557696 to %"class.impala::ExprContext"*),
+//              %"class.impala::TupleRow"* %row)
+//   %5 = extractvalue { i64, i8* } %result4, 0
+//   %is_null5 = trunc i64 %5 to i1
+//   %null_byte_loc6 = getelementptr i8, i8* %2, i32 1
+//   %6 = load i8, i8* %null_byte_loc6
+//   %7 = icmp ne i8 %6, 0
+//   %loc7 = getelementptr i8, i8* %1, i32 8
+//   %row_val8 = bitcast i8* %loc7 to %"struct.impala::StringValue"*
+//   br i1 %is_null5, label %null1, label %not_null2
+//
+// cmp:                                              ; preds = %not_null
+//   %8 = load i32, i32* %row_val
+//   %9 = ashr i64 %result, 32
+//   %10 = trunc i64 %9 to i32
+//   %cmp_raw = icmp eq i32 %10, %8
+//   br i1 %cmp_raw, label %continue, label %false_block
 //
 // null1:                                            ; preds = %continue
-//   br i1 false, label %continue3, label %false_block
+//   br i1 %7, label %continue3, label %false_block
 //
 // not_null2:                                        ; preds = %continue
-//   %6 = extractvalue { i64, i8* } %result4, 0
-//   %7 = ashr i64 %6, 32
-//   %8 = trunc i64 %7 to i32
-//   %result5 = extractvalue { i64, i8* } %result4, 1
-//   %cmp_raw6 = call i1 @_Z11StringValEQPciPKN6impala11StringValueE(
-//       i8* %result5, i32 %8, %"struct.impala::StringValue"* inttoptr
-//       (i64 104774384 to %"struct.impala::StringValue"*))
-//   br i1 %cmp_raw6, label %continue3, label %false_block
+//   br i1 %7, label %false_block, label %cmp9
 //
-// continue3:                                        ; preds = %not_null2, 
%null1
+// continue3:                                        ; preds = %cmp9, %null1
 //   ret i1 true
+//
+// cmp9:                                             ; preds = %not_null2
+//   store { i64, i8* } %result4, { i64, i8* }* %0
+//   %11 = bitcast { i64, i8* }* %0 to %"struct.impala_udf::StringVal"*
+//   %cmp_raw10 = call i1 @_Z13StringValueEqRKN10impala_udf9StringValERKN6
+//                impala11StringValueE(%"struct.impala_udf::StringVal"* %11,
+//                %"struct.impala::StringValue"* %row_val8)
+//   br i1 %cmp_raw10, label %continue3, label %false_block
 // }
 Status HashTableCtx::CodegenEquals(RuntimeState* state, bool 
force_null_equality,
     Function** fn) {
@@ -828,7 +1053,7 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, 
bool force_null_equality
   Type* this_type = codegen->GetType(HashTableCtx::LLVM_CLASS_NAME);
   DCHECK(this_type != NULL);
   PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
+  PointerType* buffer_ptr_type = PointerType::get(codegen->ptr_type(), 0);
   LlvmCodeGen::FnPrototype prototype(codegen, "Equals", 
codegen->GetType(TYPE_BOOLEAN));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
   prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
@@ -839,6 +1064,16 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, 
bool force_null_equality
   *fn = prototype.GeneratePrototype(&builder, args);
   Value* row = args[1];
 
+  // Load cur_expr_values_ into a LLVM pointer.
+  Value* cur_expr_values_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_);
+  Value* cur_expr_values = builder.CreateLoad(cur_expr_values_ptr);
+
+  // Load cur_expr_values_null_ into a LLVM pointer.
+  Value* cur_expr_values_null_ptr = codegen->CastPtrToLlvmPtr(buffer_ptr_type,
+      &expr_values_cache_.cur_expr_values_null_);
+  Value* cur_expr_values_null = builder.CreateLoad(cur_expr_values_null_ptr);
+
   BasicBlock* false_block = BasicBlock::Create(context, "false_block", *fn);
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     BasicBlock* null_block = BasicBlock::Create(context, "null", *fn);
@@ -862,25 +1097,26 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state, 
bool force_null_equality
         build_expr_ctxs_[i]->root()->type(), expr_fn, expr_fn_args, "result");
     Value* is_null = result.GetIsNull();
 
-    // Determine if row is null (i.e. expr_value_null_bits_[i] == true). In
+    // Determine if row is null (i.e. cur_expr_values_null_[i] == true). In
     // the case where the hash table does not store nulls, this is always 
false.
     Value* row_is_null = codegen->false_value();
-    uint8_t* null_byte_loc = &expr_value_null_bits_[i];
 
     // We consider null values equal if we are comparing build rows or if the 
join
     // predicate is <=>
     if (force_null_equality || finds_nulls_[i]) {
-      Value* llvm_null_byte_loc =
-          codegen->CastPtrToLlvmPtr(codegen->ptr_type(), null_byte_loc);
+      Value* llvm_null_byte_loc = builder.CreateGEP(NULL, cur_expr_values_null,
+          codegen->GetIntConstant(TYPE_INT, i), "null_byte_loc");
       Value* null_byte = builder.CreateLoad(llvm_null_byte_loc);
       row_is_null = builder.CreateICmpNE(null_byte,
           codegen->GetIntConstant(TYPE_TINYINT, 0));
     }
 
-    // Get llvm value for row_val from 'expr_values_buffer_'
-    void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
-    Value* row_val = codegen->CastPtrToLlvmPtr(
-        codegen->GetPtrType(build_expr_ctxs_[i]->root()->type()), loc);
+    // Get llvm value for row_val from 'cur_expr_values_'
+    int offset = expr_values_cache_.expr_values_offsets(i);
+    Value* loc = builder.CreateGEP(NULL, cur_expr_values,
+        codegen->GetIntConstant(TYPE_INT, offset), "loc");
+    Value* row_val = builder.CreatePointerCast(loc,
+        codegen->GetPtrType(build_expr_ctxs_[i]->root()->type()), "row_val");
 
     // Branch for GetValue() returning NULL
     builder.CreateCondBr(is_null, null_block, not_null_block);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 673822e..6fc4169 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -111,10 +111,12 @@ class HashTableCtx {
   ///  - probe_exprs are used during FindProbeRow()
   ///  - stores_nulls: if false, TupleRows with nulls are ignored during Insert
   ///  - finds_nulls: if finds_nulls[i] is false, FindProbeRow() returns End() 
for
-  ///      TupleRows with nulls in position i even if stores_nulls is true.
-  ///  - initial_seed: Initial seed value to use when computing hashes for 
rows with
+  ///        TupleRows with nulls in position i even if stores_nulls is true.
+  ///  - initial_seed: initial seed value to use when computing hashes for 
rows with
   ///    level 0. Other levels have their seeds derived from this seed.
-  ///  - The max levels we will hash with.
+  ///  - max_levels: the max levels we will hash with.
+  ///  - tracker: the memory tracker of the exec node which owns this hash 
table context.
+  ///        Memory usage of expression values cache is charged against it.
   /// TODO: stores_nulls is too coarse: for a hash table in which some columns 
are joined
   ///       with '<=>' and others with '=', stores_nulls could distinguish 
between columns
   ///       in which nulls are stored and columns in which they are not, which 
could save
@@ -122,7 +124,18 @@ class HashTableCtx {
   HashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
       const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
       const std::vector<bool>& finds_nulls, int32_t initial_seed, int 
max_levels,
-      int num_build_tuples);
+      MemTracker* tracker);
+
+  /// Create a hash table context with the specified parameters, invoke Init() 
to
+  /// initialize the new hash table context and return it in 'ht_ctx'. Please 
see header
+  /// comments of HashTableCtx constructor for details of the parameters.
+  /// 'num_build_tuples' is the number of tuples of a row in the build side, 
used for
+  /// computing the size of a scratch row.
+  static Status Create(RuntimeState* state,
+      const std::vector<ExprContext*>& build_expr_ctxs,
+      const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
+      const std::vector<bool>& finds_nulls, int32_t initial_seed, int 
max_levels,
+      int num_build_tuples, MemTracker* tracker, 
boost::scoped_ptr<HashTableCtx>* ht_ctx);
 
   /// Call to cleanup any resources.
   void Close();
@@ -135,29 +148,28 @@ class HashTableCtx {
 
   TupleRow* ALWAYS_INLINE scratch_row() const { return scratch_row_; }
 
-  /// Returns the results of the exprs at 'expr_idx' evaluated over the last 
row
-  /// processed.
+  /// Returns the results of the expression at 'expr_idx' evaluated at the 
current row.
   /// This value is invalid if the expr evaluated to NULL.
   /// TODO: this is an awkward abstraction but aggregation node can take 
advantage of
   /// it and save some expr evaluation calls.
-  void* ALWAYS_INLINE last_expr_value(int expr_idx) const {
-    return expr_values_buffer_ + expr_values_buffer_offsets_[expr_idx];
+  void* ALWAYS_INLINE ExprValue(int expr_idx) const {
+    return expr_values_cache_.ExprValuePtr(expr_idx);
   }
 
-  /// Returns if the expr at 'expr_idx' evaluated to NULL for the last row.
-  bool ALWAYS_INLINE last_expr_value_null(int expr_idx) const {
-    return expr_value_null_bits_[expr_idx];
+  /// Returns if the expression at 'expr_idx' is evaluated to NULL for the 
current row.
+  bool ALWAYS_INLINE ExprValueNull(int expr_idx) const {
+    return static_cast<bool>(*expr_values_cache_.ExprValueNullPtr(expr_idx));
   }
 
-  /// Evaluate and hash the build/probe row, returning in *hash. Returns false 
if this
-  /// row should be rejected (doesn't need to be processed further) because it
-  /// contains NULL.
-  /// These need to be inlined in the IR module so we can find and replace the 
calls to
-  /// EvalBuildRow()/EvalProbeRow().
-  bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow* row, uint32_t* hash);
-  bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow* row, uint32_t* hash);
-
-  int ALWAYS_INLINE results_buffer_size() const { return results_buffer_size_; 
}
+  /// Evaluate and hash the build/probe row, saving the evaluation to the 
current row of
+  /// the ExprValuesCache in this hash table context: the results are saved in
+  /// 'cur_expr_values_', the nullness of expressions values in 
'cur_expr_values_null_',
+  /// and the hashed expression values in 'cur_expr_values_hash_'. Returns 
false if this
+  /// row should be rejected  (doesn't need to be processed further) because 
it contains
+  /// NULL. These need to be inlined in the IR module so we can find and 
replace the
+  /// calls to EvalBuildRow()/EvalProbeRow().
+  bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow* row);
+  bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow* row);
 
   /// Codegen for evaluating a tuple row.  Codegen'd function matches the 
signature
   /// for EvalBuildRow and EvalTupleRow.
@@ -165,13 +177,13 @@ class HashTableCtx {
   Status CodegenEvalRow(RuntimeState* state, bool build_row, llvm::Function** 
fn);
 
   /// Codegen for evaluating a TupleRow and comparing equality against
-  /// 'expr_values_buffer_'.  Function signature matches HashTable::Equals().
+  /// 'cur_expr_values_'.  Function signature matches HashTable::Equals().
   /// 'force_null_equality' is true if the generated equality function should 
treat
   /// all NULLs as equal. See the template parameter to HashTable::Equals().
   Status CodegenEquals(RuntimeState* state, bool force_null_equality,
       llvm::Function** fn);
 
-  /// Codegen for hashing the expr values in 'expr_values_buffer_'. Function 
prototype
+  /// Codegen for hashing the expr values in 'cur_expr_values_'. Function 
prototype
   /// matches HashCurrentRow identically. Unlike HashCurrentRow(), the 
returned function
   /// only uses a single hash function, rather than switching based on level_.
   /// If 'use_murmur' is true, murmur hash is used, otherwise CRC is used if 
the hardware
@@ -180,36 +192,198 @@ class HashTableCtx {
 
   static const char* LLVM_CLASS_NAME;
 
+  /// To enable prefetching, the hash table building and probing are pipelined 
by the
+  /// exec nodes. A set of rows in a row batch will be evaluated and hashed 
first and
+  /// the corresponding hash table buckets are prefetched before they are 
probed against
+  /// the hash table. ExprValuesCache is a container for caching the results of
+  /// expressions evaluations for the rows in a prefetch set to avoid 
re-evaluating the
+  /// rows again during probing. Expressions evaluation can be very expensive.
+  ///
+  /// The expression evaluation results are cached in the following data 
structures:
+  ///
+  /// - 'expr_values_array_' is an array caching the results of the rows
+  /// evaluated against either the build or probe expressions. 
'cur_expr_values_'
+  /// is a pointer into this array.
+  /// - 'expr_values_null_array_' is an array caching the nullness of each 
evaluated
+  /// expression in each row. 'cur_expr_values_null_' is a pointer into this 
array.
+  /// - 'expr_values_hash_array_' is an array of cached hash values of the 
rows.
+  /// 'cur_expr_values_hash_' is a pointer into this array.
+  /// - 'null_bitmap_' is a bitmap which indicates rows evaluated to NULL.
+  ///
+  /// ExprValuesCache provides an iterator like interface for performing a 
write pass
+  /// followed by a read pass. We refrain from providing an interface for 
random accesses
+  /// as there isn't a use case for it now and we want to avoid expensive 
multiplication
+  /// as the buffer size of each row is not necessarily power of two:
+  /// - Reset(), ResetForRead(): reset the iterators before writing / reading 
cached values.
+  /// - NextRow(): moves the iterators to point to the next row of cached 
values.
+  /// - AtEnd(): returns true if all cached rows have been read. Valid in read 
mode only.
+  ///
+  /// Various metadata information such as layout of results buffer is also 
stored in
+  /// this class. Note that the result buffer doesn't store variable length 
data. It only
+  /// contains pointers to the variable length data (e.g. if an expression 
value is a
+  /// StringValue).
+  ///
+  class ExprValuesCache {
+   public:
+    ExprValuesCache();
+
+    /// Allocates memory and initializes various data structures. Return error 
status
+    /// if memory allocation leads to the memory limits of the exec node to be 
exceeded.
+    /// 'tracker' is the memory tracker of the exec node which owns this 
HashTableCtx.
+    Status Init(RuntimeState* state, MemTracker* tracker,
+        const std::vector<ExprContext*>& build_expr_ctxs);
+
+    /// Frees up various resources and updates memory tracker with proper 
accounting.
+    /// 'tracker' should be the same memory tracker which was passed in for 
Init().
+    void Close(MemTracker* tracker);
+
+    /// Resets the cache states (iterators, end pointers etc) before writing.
+    void Reset();
+
+    /// Resets the iterators to the start before reading. Will record the 
current position
+    /// of the iterators in end pointer before resetting so AtEnd() can 
determine if all
+    /// cached values have been read.
+    void ResetForRead();
+
+    /// Advances the iterators to the next row by moving to the next entries 
in the
+    /// arrays of cached values.
+    void ALWAYS_INLINE NextRow();
+
+    /// Compute the total memory usage of this ExprValuesCache.
+    static int MemUsage(int capacity, int results_buffer_size, int 
num_build_exprs);
+
+    /// Returns the maximum number rows of expression values states which can 
be cached.
+    int ALWAYS_INLINE capacity() const { return capacity_; }
+
+    /// Returns the total size in bytes of a row of evaluated expressions' 
values.
+    int ALWAYS_INLINE expr_values_bytes_per_row() const {
+      return expr_values_bytes_per_row_;
+    }
+
+    /// Returns the offset into the result buffer of the first variable length
+    /// data results.
+    int ALWAYS_INLINE var_result_offset() const { return var_result_offset_; }
+
+    /// Returns true if the current read pass is complete, meaning all cached 
values
+    /// have been read.
+    bool ALWAYS_INLINE AtEnd() const {
+      return cur_expr_values_hash_ == cur_expr_values_hash_end_;
+    }
+
+    /// Returns true if the current row is null but nulls are not considered 
in the current
+    /// phase (build or probe).
+    bool ALWAYS_INLINE IsRowNull() const { return 
null_bitmap_.Get<false>(CurIdx()); }
+
+    /// Record in a bitmap that the current row is null but nulls are not 
considered in
+    /// the current phase (build or probe).
+    void ALWAYS_INLINE SetRowNull() { null_bitmap_.Set<false>(CurIdx(), true); 
}
+
+    /// Returns the hash values of the current row.
+    uint32_t ALWAYS_INLINE ExprValuesHash() const { return 
*cur_expr_values_hash_; }
+
+    /// Sets the hash values for the current row.
+    void ALWAYS_INLINE SetExprValuesHash(uint32_t hash) { 
*cur_expr_values_hash_ = hash; }
+
+    /// Returns a pointer to the expression value at 'expr_idx' for the 
current row.
+    uint8_t* ExprValuePtr(int expr_idx) const;
+
+    /// Returns a pointer to the boolean indicating the nullness of the 
expression value
+    /// at 'expr_idx'.
+    uint8_t* ExprValueNullPtr(int expr_idx) const;
+
+    /// Returns the offset into the results buffer of the expression value at 
'expr_idx'.
+    int ALWAYS_INLINE expr_values_offsets(int expr_idx) const {
+      return expr_values_offsets_[expr_idx];
+    }
+
+   private:
+    friend class HashTableCtx;
+
+    /// Resets the iterators to the beginning of the cache values' arrays.
+    void ResetIterators();
+
+    /// Returns the offset in number of rows into the cached values' buffer.
+    int ALWAYS_INLINE CurIdx() const {
+      return cur_expr_values_hash_ - expr_values_hash_array_.get();
+    }
+
+    /// Max amount of memory in bytes for caching evaluated expression values.
+    static const int MAX_EXPR_VALUES_ARRAY_SIZE = 256 << 10;
+
+    /// Maximum number of rows of expressions evaluation states which this
+    /// ExprValuesCache can cache.
+    int capacity_;
+
+    /// Byte size of a row of evaluated expression values. Never changes once 
set,
+    /// can be used for constant substitution during codegen.
+    int expr_values_bytes_per_row_;
+
+    /// Number of build/probe expressions.
+    int num_exprs_;
+
+    /// Pointer into 'expr_values_array_' for the current row's expression 
values.
+    uint8_t* cur_expr_values_;
+
+    /// Pointer into 'expr_values_null_array_' for the current row's nullness 
of each
+    /// expression value.
+    uint8_t* cur_expr_values_null_;
+
+    /// Pointer into 'expr_hash_value_array_' for the hash value of current 
row's
+    /// expression values.
+    uint32_t* cur_expr_values_hash_;
+
+    /// Pointer to the buffer one beyond the end of the last entry of cached 
expressions'
+    /// hash values.
+    uint32_t* cur_expr_values_hash_end_;
+
+    /// Array for caching up to 'capacity_' number of rows worth of evaluated 
expression
+    /// values. Each row consumes 'expr_values_bytes_per_row_' number of bytes.
+    boost::scoped_array<uint8_t> expr_values_array_;
+
+    /// Array for caching up to 'capacity_' number of rows worth of null 
booleans.
+    /// Each row contains 'num_exprs_' booleans to indicate nullness of 
expression values.
+    /// Used when the hash table supports NULL. Use 'uint8_t' to guarantee 
each entry is 1
+    /// byte as sizeof(bool) is implementation dependent. The IR depends on 
this
+    /// assumption.
+    boost::scoped_array<uint8_t> expr_values_null_array_;
+
+    /// Array for caching up to 'capacity_' number of rows worth of hashed 
values.
+    boost::scoped_array<uint32_t> expr_values_hash_array_;
+
+    /// One bit for each row. A bit is set if that row is not hashed as it's 
evaluated
+    /// to NULL but the hash table doesn't support NULL. Such rows may still 
be included
+    /// in outputs for certain join types (e.g. left anti joins).
+    Bitmap null_bitmap_;
+
+    /// Maps from expression index to the byte offset into a row of expression 
values.
+    /// One entry per build/probe expression.
+    std::vector<int> expr_values_offsets_;
+
+    /// Byte offset into 'cur_expr_values_' that begins the variable length 
results for
+    /// a row. If -1, there are no variable length slots. Never changes once 
set, can be
+    /// constant substituted with codegen.
+    int var_result_offset_;
+  };
+
+  ExprValuesCache* ALWAYS_INLINE expr_values_cache() { return 
&expr_values_cache_; }
+
  private:
   friend class HashTable;
   friend class HashTableTest_HashEmpty_Test;
 
+  /// Allocate various buffers for storing expression evaluation results, hash 
values,
+  /// null bits etc. Returns error if allocation causes query memory limit to 
be exceeded.
+  Status Init(RuntimeState* state, int num_build_tuples);
+
   /// Compute the hash of the values in expr_values_buffer_.
   /// This will be replaced by codegen.  We don't want this inlined for 
replacing
   /// with codegen'd functions so the function name does not change.
-  uint32_t IR_NO_INLINE HashCurrentRow() const {
-    DCHECK_LT(level_, seeds_.size());
-    if (var_result_begin_ == -1) {
-      /// This handles NULLs implicitly since a constant seed value was put
-      /// into results buffer for nulls.
-      /// TODO: figure out which hash function to use. We need to generate 
uncorrelated
-      /// hashes by changing just the seed. CRC does not have this property 
and FNV is
-      /// okay. We should switch to something else.
-      return Hash(expr_values_buffer_, results_buffer_size_, seeds_[level_]);
-    } else {
-      return HashTableCtx::HashVariableLenRow();
-    }
-  }
+  uint32_t IR_NO_INLINE HashCurrentRow() const;
 
   /// Wrapper function for calling correct HashUtil function in non-codegen'd 
case.
-  uint32_t inline Hash(const void* input, int len, uint32_t hash) const {
-    /// Use CRC hash at first level for better performance. Switch to murmur 
hash at
-    /// subsequent levels since CRC doesn't randomize well with different seed 
inputs.
-    if (level_ == 0) return HashUtil::Hash(input, len, hash);
-    return HashUtil::MurmurHash2_64(input, len, hash);
-  }
+  uint32_t Hash(const void* input, int len, uint32_t hash) const;
 
-  /// Evaluate 'row' over build exprs caching the results in 
'expr_values_buffer_' This
+  /// Evaluate 'row' over build exprs caching the results in 
'cur_expr_values_' This
   /// will be replaced by codegen.  We do not want this function inlined when 
cross
   /// compiled because we need to be able to differentiate between 
EvalBuildRow and
   /// EvalProbeRow by name and the build/probe exprs are baked into the 
codegen'd
@@ -218,7 +392,7 @@ class HashTableCtx {
     return EvalRow(row, build_expr_ctxs_);
   }
 
-  /// Evaluate 'row' over probe exprs caching the results in 
'expr_values_buffer_'
+  /// Evaluate 'row' over probe exprs caching the results in 'cur_expr_values_'
   /// This will be replaced by codegen.
   bool IR_NO_INLINE EvalProbeRow(TupleRow* row) {
     return EvalRow(row, probe_expr_ctxs_);
@@ -228,15 +402,15 @@ class HashTableCtx {
   /// fields (e.g. strings).
   uint32_t HashVariableLenRow() const;
 
-  /// Evaluate the exprs over row and cache the results in 
'expr_values_buffer_'.
+  /// Evaluate the exprs over row and cache the results in 'cur_expr_values_'.
   /// Returns whether any expr evaluated to NULL.
   /// This will be replaced by codegen.
   bool EvalRow(TupleRow* row, const std::vector<ExprContext*>& ctxs);
 
   /// Returns true if the values of build_exprs evaluated over 'build_row' 
equal the
-  /// values cached in 'expr_values_buffer_'.  This will be replaced by 
codegen.
+  /// values cached in 'cur_expr_values_'.  This will be replaced by codegen.
   /// FORCE_NULL_EQUALITY is true if all nulls should be treated as equal, 
regardless
-  /// of the values of finds_nulls_
+  /// of the values of 'finds_nulls_'.
   template<bool FORCE_NULL_EQUALITY>
   bool IR_NO_INLINE Equals(TupleRow* build_row) const;
 
@@ -263,29 +437,16 @@ class HashTableCtx {
   /// The seeds to use for hashing. Indexed by the level.
   std::vector<uint32_t> seeds_;
 
-  /// Cache of exprs values for the current row being evaluated.  This can 
either
-  /// be a build row (during Insert()) or probe row (during FindProbeRow()).
-  std::vector<int> expr_values_buffer_offsets_;
-
-  /// Byte offset into 'expr_values_buffer_' that begins the variable length 
results.
-  /// If -1, there are no variable length slots. Never changes once set, can 
be removed
-  /// with codegen.
-  int var_result_begin_;
-
-  /// Byte size of 'expr_values_buffer_'. Never changes once set, can be 
removed with
-  /// codegen.
-  int results_buffer_size_;
-
-  /// Buffer to store evaluated expr results.  This address must not change 
once
-  /// allocated since the address is baked into the codegen.
-  uint8_t* expr_values_buffer_;
-
-  /// Use bytes instead of bools to be compatible with llvm.  This address must
-  /// not change once allocated.
-  uint8_t* expr_value_null_bits_;
+  /// The ExprValuesCache for caching expression evaluation results, null 
bytes and hash
+  /// values for rows. Used to store results of batch evaluations of rows.
+  ExprValuesCache expr_values_cache_;
 
   /// Scratch buffer to generate rows on the fly.
   TupleRow* scratch_row_;
+
+  /// Memory tracker of the exec node which owns this hash table context. 
Account the
+  /// memory usage of expression values cache towards it.
+  MemTracker* tracker_;
 };
 
 /// The hash table consists of a contiguous array of buckets that contain a 
pointer to the
@@ -381,26 +542,27 @@ class HashTable {
   /// the insert fails and this function returns false.
   /// Used during the build phase of hash joins.
   bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx,
-      const BufferedTupleStream::RowIdx& idx, TupleRow* row, uint32_t hash);
+      const BufferedTupleStream::RowIdx& idx, TupleRow* row);
 
   /// Prefetch the hash table bucket which the given hash value 'hash' maps to.
   template<const bool READ>
   void IR_ALWAYS_INLINE PrefetchBucket(uint32_t hash);
 
-  /// Returns an iterator to the bucket matching the last row evaluated in 
'ht_ctx'.
-  /// Returns HashTable::End() if no match is found. The iterator can be 
iterated until
-  /// HashTable::End() to find all the matching rows. Advancing the returned 
iterator will
-  /// go to the next matching row. The matching rows do not need to be 
evaluated since all
-  /// the nodes of a bucket are duplicates. One scan can be in progress for 
each 'ht_ctx'.
-  /// Used during the probe phase of hash joins.
-  Iterator IR_ALWAYS_INLINE FindProbeRow(HashTableCtx* ht_ctx, uint32_t hash);
+  /// Returns an iterator to the bucket that matches the probe expression 
results that
+  /// are cached at the current position of the ExprValuesCache in 'ht_ctx'. 
Assumes that
+  /// the ExprValuesCache was filled using EvalAndHashProbe(). Returns 
HashTable::End()
+  /// if no match is found. The iterator can be iterated until 
HashTable::End() to find
+  /// all the matching rows. Advancing the returned iterator will go to the 
next matching
+  /// row. The matching rows do not need to be evaluated since all the nodes 
of a bucket
+  /// are duplicates. One scan can be in progress for each 'ht_ctx'. Used in 
the probe
+  /// phase of hash joins.
+  Iterator IR_ALWAYS_INLINE FindProbeRow(HashTableCtx* ht_ctx);
 
   /// If a match is found in the table, return an iterator as in 
FindProbeRow(). If a
   /// match was not present, return an iterator pointing to the empty bucket 
where the key
   /// should be inserted. Returns End() if the table is full. The caller can 
set the data
   /// in the bucket using a Set*() method on the iterator.
-  Iterator IR_ALWAYS_INLINE FindBuildRowBucket(HashTableCtx* ht_ctx, uint32_t 
hash,
-      bool* found);
+  Iterator IR_ALWAYS_INLINE FindBuildRowBucket(HashTableCtx* ht_ctx, bool* 
found);
 
   /// Returns number of elements inserted in the hash table
   int64_t size() const {
@@ -531,6 +693,10 @@ class HashTable {
     /// Returns true if this iterator is at the end, i.e. GetRow() cannot be 
called.
     bool ALWAYS_INLINE AtEnd() const { return bucket_idx_ == BUCKET_NOT_FOUND; 
}
 
+    /// Prefetch the hash table bucket which the iterator is pointing to now.
+    template<const bool READ>
+    void IR_ALWAYS_INLINE PrefetchBucket();
+
    private:
     friend class HashTable;
 
@@ -579,28 +745,24 @@ class HashTable {
   /// Using the returned index value, the caller can create an iterator that 
can be
   /// iterated until End() to find all the matching rows.
   ///
-  /// If 'row' is not NULL, 'row' will be evaluated once against either the 
build or
-  /// probe exprs (determined by the parameter 'is_build') before calling 
Equals().
-  /// If 'row' is NULL, EvalAndHashBuild() or EvalAndHashProbe() must have 
been called
-  /// before calling this function.
+  /// EvalAndHashBuild() or EvalAndHashProbe() must have been called before 
calling
+  /// this function. The values of the expression values cache in 'ht_ctx' 
will be
+  /// used to probe the hash table.
   ///
   /// 'FORCE_NULL_EQUALITY' is true if NULLs should always be considered equal 
when
   /// comparing two rows.
   ///
-  /// 'is_build' indicates which of build or probe exprs is used for lazy 
evaluation.
-  /// 'row' is the row being probed against the hash table. Used for lazy 
evaluation.
   /// 'hash' is the hash computed by EvalAndHashBuild() or EvalAndHashProbe().
   /// 'found' indicates that a bucket that contains an equal row is found.
   ///
   /// There are wrappers of this function that perform the Find and Insert 
logic.
   template <bool FORCE_NULL_EQUALITY>
-  int64_t IR_ALWAYS_INLINE Probe(Bucket* buckets, int64_t num_buckets, bool 
is_build,
-      HashTableCtx* ht_ctx, TupleRow* row, uint32_t hash, bool* found);
+  int64_t IR_ALWAYS_INLINE Probe(Bucket* buckets, int64_t num_buckets,
+      HashTableCtx* ht_ctx, uint32_t hash, bool* found);
 
   /// Performs the insert logic. Returns the HtData* of the bucket or 
duplicate node
   /// where the data should be inserted. Returns NULL if the insert was not 
successful.
-  HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx, TupleRow* row,
-      uint32_t hash);
+  HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx);
 
   /// Updates 'bucket_idx' to the index of the next non-empty bucket. If the 
bucket has
   /// duplicates, 'node' will be pointing to the head of the linked list of 
duplicates.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/hash-table.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h
index 0d4b1b6..6d33869 100644
--- a/be/src/exec/hash-table.inline.h
+++ b/be/src/exec/hash-table.inline.h
@@ -23,23 +23,30 @@
 
 namespace impala {
 
-inline bool HashTableCtx::EvalAndHashBuild(TupleRow* row, uint32_t* hash) {
+inline bool HashTableCtx::EvalAndHashBuild(TupleRow* row) {
   bool has_null = EvalBuildRow(row);
   if (!stores_nulls_ && has_null) return false;
-  *hash = HashCurrentRow();
+  expr_values_cache_.SetExprValuesHash(HashCurrentRow());
   return true;
 }
 
-inline bool HashTableCtx::EvalAndHashProbe(TupleRow* row, uint32_t* hash) {
+inline bool HashTableCtx::EvalAndHashProbe(TupleRow* row) {
   bool has_null = EvalProbeRow(row);
   if (has_null && !(stores_nulls_ && finds_some_nulls_)) return false;
-  *hash = HashCurrentRow();
+  expr_values_cache_.SetExprValuesHash(HashCurrentRow());
   return true;
 }
 
+inline void HashTableCtx::ExprValuesCache::NextRow() {
+  cur_expr_values_ += expr_values_bytes_per_row_;
+  cur_expr_values_null_ += num_exprs_;
+  ++cur_expr_values_hash_;
+  DCHECK_LE(cur_expr_values_hash_ - expr_values_hash_array_.get(), capacity_);
+}
+
 template <bool FORCE_NULL_EQUALITY>
 inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets,
-    bool is_build, HashTableCtx* ht_ctx, TupleRow* row, uint32_t hash, bool* 
found) {
+    HashTableCtx* ht_ctx, uint32_t hash, bool* found) {
   DCHECK(buckets != NULL);
   DCHECK_GT(num_buckets, 0);
   *found = false;
@@ -49,20 +56,10 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t 
num_buckets,
   // for knowing when to exit the loop (e.g. by capping the total travel 
length). In case
   // of quadratic probing it is also used for calculating the length of the 
next jump.
   int64_t step = 0;
-  bool need_eval = row != NULL;
   do {
     Bucket* bucket = &buckets[bucket_idx];
     if (LIKELY(!bucket->filled)) return bucket_idx;
     if (hash == bucket->hash) {
-      // Evaluate 'row' if needed before calling Equals() for the first time 
in this loop.
-      if (need_eval) {
-        if (is_build) {
-          ht_ctx->EvalBuildRow(row);
-        } else {
-          ht_ctx->EvalProbeRow(row);
-        }
-        need_eval = false;
-      }
       if (ht_ctx != NULL &&
           ht_ctx->Equals<FORCE_NULL_EQUALITY>(GetRow(bucket, 
ht_ctx->scratch_row_))) {
         *found = true;
@@ -89,12 +86,11 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t 
num_buckets,
   return Iterator::BUCKET_NOT_FOUND;
 }
 
-inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx,
-    TupleRow* row, uint32_t hash) {
+inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) {
   ++num_probes_;
   bool found = false;
-  int64_t bucket_idx =
-      Probe<true>(buckets_, num_buckets_, true, ht_ctx, row, hash, &found);
+  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
+  int64_t bucket_idx = Probe<true>(buckets_, num_buckets_, ht_ctx, hash, 
&found);
   DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND);
   if (found) {
     // We need to insert a duplicate node, note that this may fail to allocate 
memory.
@@ -108,8 +104,8 @@ inline HashTable::HtData* 
HashTable::InsertInternal(HashTableCtx* ht_ctx,
 }
 
 inline bool HashTable::Insert(HashTableCtx* ht_ctx,
-    const BufferedTupleStream::RowIdx& idx, TupleRow* row, uint32_t hash) {
-  HtData* htdata = InsertInternal(ht_ctx, row, hash);
+    const BufferedTupleStream::RowIdx& idx, TupleRow* row) {
+  HtData* htdata = InsertInternal(ht_ctx);
   // If successful insert, update the contents of the newly inserted entry 
with 'idx'.
   if (LIKELY(htdata != NULL)) {
     if (stores_tuples_) {
@@ -133,11 +129,11 @@ inline void HashTable::PrefetchBucket(uint32_t hash) {
   __builtin_prefetch(&buckets_[bucket_idx], READ ? 0 : 1, 1);
 }
 
-inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* ht_ctx, 
uint32_t hash) {
+inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* ht_ctx) {
   ++num_probes_;
   bool found = false;
-  int64_t bucket_idx =
-      Probe<false>(buckets_, num_buckets_, false, ht_ctx, NULL, hash, &found);
+  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
+  int64_t bucket_idx = Probe<false>(buckets_, num_buckets_, ht_ctx, hash, 
&found);
   if (found) {
     return Iterator(this, ht_ctx->scratch_row(), bucket_idx,
         buckets_[bucket_idx].bucketData.duplicates);
@@ -147,10 +143,10 @@ inline HashTable::Iterator 
HashTable::FindProbeRow(HashTableCtx* ht_ctx, uint32_
 
 // TODO: support lazy evaluation like HashTable::Insert().
 inline HashTable::Iterator HashTable::FindBuildRowBucket(
-    HashTableCtx* ht_ctx, uint32_t hash, bool* found) {
+    HashTableCtx* ht_ctx, bool* found) {
   ++num_probes_;
-  int64_t bucket_idx =
-      Probe<true>(buckets_, num_buckets_, false, ht_ctx, NULL, hash, found);
+  uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
+  int64_t bucket_idx = Probe<true>(buckets_, num_buckets_, ht_ctx, hash, 
found);
   DuplicateNode* duplicates = LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND) 
?
       buckets_[bucket_idx].bucketData.duplicates : NULL;
   return Iterator(this, ht_ctx->scratch_row(), bucket_idx, duplicates);
@@ -318,6 +314,16 @@ inline void HashTable::Iterator::SetAtEnd() {
   node_ = NULL;
 }
 
+template<const bool READ>
+inline void HashTable::Iterator::PrefetchBucket() {
+  if (LIKELY(!AtEnd())) {
+    // HashTable::PrefetchBucket() takes a hash value to index into the hash 
bucket
+    // array. Passing 'bucket_idx_' here is sufficient.
+    DCHECK_EQ((bucket_idx_ & ~(table_->num_buckets_ - 1)), 0);
+    table_->PrefetchBucket<READ>(bucket_idx_);
+  }
+}
+
 inline void HashTable::Iterator::Next() {
   DCHECK(!AtEnd());
   if (table_->buckets_[bucket_idx_].hasDuplicates && node_->next != NULL) {


Reply via email to