IMPALA-3286: Software prefetching for hash table build.
This change pipelines the code which builds the hash table.
This is based on the idea which Mostafa presented earlier.
Essentially, the pipelined code will first evaluate all the
rows to be inserted, compute their hash values and prefetch
the corresponding hash table buckets before going through
all the rows again to insert them into the hash table. This
change also introduces lazy evaluation of the build side
expression in Equals() to avoid unnecessary build side
expression evaluation for the second time in case the hash
table bucket is empty or the hash doesn't match due to
collision.
With this change, the hash table build time of a self-join
with lineitem reduces by more than half (going from 10.5s to 4.5s).
The overall query time drops from 37.28s to 31.15s (~16% reduction).
select count(*) from lineitem o1, lineitem o2
where o1.l_orderkey = o2.l_orderkey and
o1.l_linenumber = o2.l_linenumber
TPCH(15) also improves by 2.5% overall, with certain queries
improving up to 8%:
+----------+-----------------------+---------+------------+------------+----------------+
| Workload | File Format | Avg (s) | Delta(Avg) | GeoMean(s) |
Delta(GeoMean) |
+----------+-----------------------+---------+------------+------------+----------------+
| TPCH(15) | parquet / none / none | 14.34 | -2.49% | 9.36 | -1.65%
|
+----------+-----------------------+---------+------------+------------+----------------+
+----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
| Workload | Query | File Format | Avg(s) | Base Avg(s) |
Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters |
+----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
| TPCH(15) | TPCH-Q1 | parquet / none / none | 8.44 | 8.05 | +4.92%
| 2.89% | 1.50% | 1 | 10 |
| TPCH(15) | TPCH-Q11 | parquet / none / none | 1.85 | 1.76 | +4.86%
| 3.88% | 3.93% | 1 | 10 |
| TPCH(15) | TPCH-Q2 | parquet / none / none | 2.90 | 2.78 | +4.41%
| 8.68% | * 15.78% * | 1 | 10 |
| TPCH(15) | TPCH-Q19 | parquet / none / none | 39.46 | 38.53 | +2.40%
| 2.21% | 2.23% | 1 | 10 |
| TPCH(15) | TPCH-Q16 | parquet / none / none | 1.90 | 1.86 | +1.81%
| 2.54% | 2.74% | 1 | 10 |
| TPCH(15) | TPCH-Q15 | parquet / none / none | 5.50 | 5.43 | +1.32%
| 2.62% | 3.34% | 1 | 10 |
| TPCH(15) | TPCH-Q6 | parquet / none / none | 3.03 | 3.01 | +0.61%
| 3.54% | 2.14% | 1 | 10 |
| TPCH(15) | TPCH-Q17 | parquet / none / none | 31.22 | 31.13 | +0.29%
| 0.32% | 0.49% | 1 | 10 |
| TPCH(15) | TPCH-Q14 | parquet / none / none | 3.63 | 3.64 | -0.21%
| 2.22% | 2.70% | 1 | 10 |
| TPCH(15) | TPCH-Q12 | parquet / none / none | 3.88 | 3.89 | -0.31%
| 1.90% | 1.82% | 1 | 10 |
| TPCH(15) | TPCH-Q7 | parquet / none / none | 26.25 | 26.64 | -1.50%
| 2.30% | 2.40% | 1 | 10 |
| TPCH(15) | TPCH-Q20 | parquet / none / none | 6.26 | 6.42 | -2.45%
| 1.44% | 1.81% | 1 | 10 |
| TPCH(15) | TPCH-Q9 | parquet / none / none | 30.56 | 31.43 | -2.77%
| 0.41% | 0.64% | 1 | 10 |
| TPCH(15) | TPCH-Q13 | parquet / none / none | 13.53 | 13.94 | -3.00%
| 1.02% | 0.50% | 1 | 10 |
| TPCH(15) | TPCH-Q8 | parquet / none / none | 24.93 | 25.76 | -3.22%
| 0.95% | 1.00% | 1 | 10 |
| TPCH(15) | TPCH-Q10 | parquet / none / none | 6.58 | 6.89 | -4.50%
| 1.37% | 1.24% | 1 | 10 |
| TPCH(15) | TPCH-Q18 | parquet / none / none | 31.44 | 33.12 | -5.05%
| 0.50% | 0.66% | 1 | 10 |
| TPCH(15) | TPCH-Q21 | parquet / none / none | 31.56 | 33.55 | -5.92%
| 4.31% | 5.01% | 1 | 10 |
| TPCH(15) | TPCH-Q22 | parquet / none / none | 4.17 | 4.44 | -5.98%
| 0.59% | 0.75% | 1 | 10 |
| TPCH(15) | TPCH-Q5 | parquet / none / none | 14.67 | 15.66 | -6.34%
| 8.08% | 1.13% | 1 | 10 |
| TPCH(15) | TPCH-Q3 | parquet / none / none | 11.25 | 12.01 | -6.38%
| 1.17% | 0.85% | 1 | 10 |
| TPCH(15) | TPCH-Q4 | parquet / none / none | 12.38 | 13.49 | -8.19%
| 1.44% | 0.70% | 1 | 10 |
+----------+----------+-----------------------+--------+-------------+------------+-----------+----------------+-------------+-------+
Change-Id: Ib85e7fc162ad25c849b9e716b629e226697cd940
Reviewed-on: http://gerrit.cloudera.org:8080/2896
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e32720e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e32720e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e32720e0
Branch: refs/heads/master
Commit: e32720e02241e70c2af2c6e677ea6eb6563d8f39
Parents: 01baf57
Author: Michael Ho <[email protected]>
Authored: Tue Apr 26 00:24:53 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Thu May 12 14:17:51 2016 -0700
----------------------------------------------------------------------
be/src/exec/hash-table-test.cc | 20 ++++--
be/src/exec/hash-table.cc | 12 ++--
be/src/exec/hash-table.h | 74 ++++++++++++--------
be/src/exec/hash-table.inline.h | 77 +++++++++++++--------
be/src/exec/partitioned-aggregation-node-ir.cc | 22 +++---
be/src/exec/partitioned-aggregation-node.h | 6 +-
be/src/exec/partitioned-hash-join-node-ir.cc | 46 ++++++++----
be/src/exec/partitioned-hash-join-node.cc | 24 +++----
be/src/exec/partitioned-hash-join-node.h | 21 ++++--
be/src/runtime/row-batch.h | 9 ++-
10 files changed, 190 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e32720e0/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index c203ba0..bef3f1e 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -242,7 +242,9 @@ class HashTableTest : public testing::Test {
for (int i = 0; i < 2; ++i) {
uint32_t hash = 0;
if (!ht_ctx.EvalAndHashBuild(build_rows[i], &hash)) continue;
- bool inserted = hash_table->Insert(&ht_ctx, build_rows[i]->GetTuple(0),
hash);
+ BufferedTupleStream::RowIdx dummy_row_idx;
+ EXPECT_TRUE(hash_table->stores_tuples_);
+ bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx,
build_rows[i], hash);
EXPECT_TRUE(inserted);
}
EXPECT_EQ(hash_table->num_buckets() - hash_table->EmptyBuckets(), 1);
@@ -275,7 +277,9 @@ class HashTableTest : public testing::Test {
ASSERT_TRUE(success);
for (int i = 0; i < 5; ++i) {
if (!ht_ctx.EvalAndHashBuild(build_rows[i], &hash)) continue;
- bool inserted = hash_table->Insert(&ht_ctx, build_rows[i]->GetTuple(0),
hash);
+ BufferedTupleStream::RowIdx dummy_row_idx;
+ EXPECT_TRUE(hash_table->stores_tuples_);
+ bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx,
build_rows[i], hash);
EXPECT_TRUE(inserted);
}
EXPECT_EQ(hash_table->size(), 5);
@@ -333,7 +337,9 @@ class HashTableTest : public testing::Test {
for (int i = 0; i < val; ++i) {
TupleRow* row = CreateTupleRow(val);
if (!ht_ctx.EvalAndHashBuild(row, &hash)) continue;
- hash_table->Insert(&ht_ctx, row->GetTuple(0), hash);
+ BufferedTupleStream::RowIdx dummy_row_idx;
+ EXPECT_TRUE(hash_table->stores_tuples_);
+ hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
build_rows.push_back(row);
probe_rows[val].expected_build_rows.push_back(row);
}
@@ -390,7 +396,9 @@ class HashTableTest : public testing::Test {
for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
TupleRow* row = CreateTupleRow(build_row_val);
if (!ht_ctx.EvalAndHashBuild(row, &hash)) continue;
- bool inserted = hash_table->Insert(&ht_ctx, row->GetTuple(0), hash);
+ BufferedTupleStream::RowIdx dummy_row_idx;
+ EXPECT_TRUE(hash_table->stores_tuples_);
+ bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
if (!inserted) goto done_inserting;
}
expected_size += num_to_add;
@@ -438,7 +446,9 @@ class HashTableTest : public testing::Test {
// Insert using both Insert() and FindBucket() methods.
if (build_row_val % 2 == 0) {
- bool inserted = hash_table->Insert(&ht_ctx, row->GetTuple(0), hash);
+ BufferedTupleStream::RowIdx dummy_row_idx;
+ EXPECT_TRUE(hash_table->stores_tuples_);
+ bool inserted = hash_table->Insert(&ht_ctx, dummy_row_idx, row, hash);
EXPECT_TRUE(inserted);
} else {
iter = hash_table->FindBuildRowBucket(&ht_ctx, hash, &found);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e32720e0/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 0fae3c7..9cab765 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -93,7 +93,7 @@ HashTableCtx::HashTableCtx(const std::vector<ExprContext*>&
build_expr_ctxs,
finds_some_nulls_(std::accumulate(
finds_nulls_.begin(), finds_nulls_.end(), false,
std::logical_or<bool>())),
level_(0),
- row_(reinterpret_cast<TupleRow*>(malloc(sizeof(Tuple*) *
num_build_tuples))) {
+ scratch_row_(reinterpret_cast<TupleRow*>(malloc(sizeof(Tuple*) *
num_build_tuples))) {
DCHECK(!finds_some_nulls_ || stores_nulls_);
// Compute the layout and buffer size to store the evaluated expr results
DCHECK_EQ(build_expr_ctxs_.size(), probe_expr_ctxs_.size());
@@ -124,11 +124,11 @@ void HashTableCtx::Close() {
DCHECK(expr_value_null_bits_ != NULL);
delete[] expr_value_null_bits_;
expr_value_null_bits_ = NULL;
- free(row_);
- row_ = NULL;
+ free(scratch_row_);
+ scratch_row_ = NULL;
}
-bool HashTableCtx::EvalRow(TupleRow* row, const vector<ExprContext*>& ctxs)
const {
+bool HashTableCtx::EvalRow(TupleRow* row, const vector<ExprContext*>& ctxs) {
bool has_null = false;
for (int i = 0; i < ctxs.size(); ++i) {
void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
@@ -306,7 +306,7 @@ bool HashTable::ResizeBuckets(int64_t num_buckets, const
HashTableCtx* ht_ctx) {
Bucket* bucket_to_copy = &buckets_[iter.bucket_idx_];
bool found = false;
int64_t bucket_idx =
- Probe<true>(new_buckets, num_buckets, NULL, bucket_to_copy->hash,
&found);
+ Probe<true>(new_buckets, num_buckets, false, NULL, NULL,
bucket_to_copy->hash, &found);
DCHECK(!found);
DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND) << " Probe failed even
though "
" there are free buckets. " << num_buckets << " " <<
num_filled_buckets_;
@@ -874,7 +874,7 @@ Status HashTableCtx::CodegenEquals(RuntimeState* state,
bool force_null_equality
codegen->CastPtrToLlvmPtr(codegen->ptr_type(), null_byte_loc);
Value* null_byte = builder.CreateLoad(llvm_null_byte_loc);
row_is_null = builder.CreateICmpNE(null_byte,
-
codegen->GetIntConstant(TYPE_TINYINT, 0));
+ codegen->GetIntConstant(TYPE_TINYINT, 0));
}
// Get llvm value for row_val from 'expr_values_buffer_'
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e32720e0/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 0b739eb..e496cde 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -130,9 +130,10 @@ class HashTableCtx {
void set_level(int level);
int ALWAYS_INLINE level() const { return level_; }
+
uint32_t ALWAYS_INLINE seed(int level) { return seeds_.at(level); }
- TupleRow* ALWAYS_INLINE row() const { return row_; }
+ TupleRow* ALWAYS_INLINE scratch_row() const { return scratch_row_; }
/// Returns the results of the exprs at 'expr_idx' evaluated over the last
row
/// processed.
@@ -153,8 +154,8 @@ class HashTableCtx {
/// contains NULL.
/// These need to be inlined in the IR module so we can find and replace the
calls to
/// EvalBuildRow()/EvalProbeRow().
- bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow* row, uint32_t* hash) const;
- bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow* row, uint32_t* hash) const;
+ bool IR_ALWAYS_INLINE EvalAndHashBuild(TupleRow* row, uint32_t* hash);
+ bool IR_ALWAYS_INLINE EvalAndHashProbe(TupleRow* row, uint32_t* hash);
int ALWAYS_INLINE results_buffer_size() const { return results_buffer_size_;
}
@@ -165,8 +166,8 @@ class HashTableCtx {
/// Codegen for evaluating a TupleRow and comparing equality against
/// 'expr_values_buffer_'. Function signature matches HashTable::Equals().
- /// force_null_equality is true if the generated equality function should
treat all
- /// NULLs as equal. See the template parameter to HashTable::Equals().
+ /// 'force_null_equality' is true if the generated equality function should
treat
+ /// all NULLs as equal. See the template parameter to HashTable::Equals().
Status CodegenEquals(RuntimeState* state, bool force_null_equality,
llvm::Function** fn);
@@ -213,13 +214,13 @@ class HashTableCtx {
/// compiled because we need to be able to differentiate between
EvalBuildRow and
/// EvalProbeRow by name and the build/probe exprs are baked into the
codegen'd
/// function.
- bool IR_NO_INLINE EvalBuildRow(TupleRow* row) const {
+ bool IR_NO_INLINE EvalBuildRow(TupleRow* row) {
return EvalRow(row, build_expr_ctxs_);
}
/// Evaluate 'row' over probe exprs caching the results in
'expr_values_buffer_'
/// This will be replaced by codegen.
- bool IR_NO_INLINE EvalProbeRow(TupleRow* row) const {
+ bool IR_NO_INLINE EvalProbeRow(TupleRow* row) {
return EvalRow(row, probe_expr_ctxs_);
}
@@ -230,15 +231,18 @@ class HashTableCtx {
/// Evaluate the exprs over row and cache the results in
'expr_values_buffer_'.
/// Returns whether any expr evaluated to NULL.
/// This will be replaced by codegen.
- bool EvalRow(TupleRow* row, const std::vector<ExprContext*>& ctxs) const;
+ bool EvalRow(TupleRow* row, const std::vector<ExprContext*>& ctxs);
/// Returns true if the values of build_exprs evaluated over 'build_row'
equal the
- /// values cached in expr_values_buffer_. This will be replaced by
- /// codegen. FORCE_NULL_EQUALITY is true if all nulls should be treated as
equal,
- /// regardless of the values of finds_nulls_
+ /// values cached in 'expr_values_buffer_'. This will be replaced by
codegen.
+ /// FORCE_NULL_EQUALITY is true if all nulls should be treated as equal,
regardless
+ /// of the values of finds_nulls_
template<bool FORCE_NULL_EQUALITY>
bool IR_NO_INLINE Equals(TupleRow* build_row) const;
+ /// Cross-compiled function to access member variables used in
CodegenHashCurrentRow().
+ uint32_t GetHashSeed() const;
+
const std::vector<ExprContext*>& build_expr_ctxs_;
const std::vector<ExprContext*>& probe_expr_ctxs_;
@@ -281,10 +285,7 @@ class HashTableCtx {
uint8_t* expr_value_null_bits_;
/// Scratch buffer to generate rows on the fly.
- TupleRow* row_;
-
- /// Cross-compiled functions to access member variables used in
CodegenHashCurrentRow().
- uint32_t GetHashSeed() const;
+ TupleRow* scratch_row_;
};
/// The hash table consists of a contiguous array of buckets that contain a
pointer to the
@@ -382,9 +383,9 @@ class HashTable {
bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx,
const BufferedTupleStream::RowIdx& idx, TupleRow* row, uint32_t hash);
- /// Same as Insert() but for inserting a single Tuple. The 'tuple' is not
copied by
- /// the hash table and the caller must guarantee it stays in memory.
- bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx, Tuple* tuple, uint32_t
hash);
+ /// Prefetch the hash table bucket which the given hash value 'hash' maps to.
+ template<const bool READ>
+ void IR_ALWAYS_INLINE PrefetchBucket(uint32_t hash);
/// Returns an iterator to the bucket matching the last row evaluated in
'ht_ctx'.
/// Returns HashTable::End() if no match is found. The iterator can be
iterated until
@@ -392,13 +393,13 @@ class HashTable {
/// go to the next matching row. The matching rows do not need to be
evaluated since all
/// the nodes of a bucket are duplicates. One scan can be in progress for
each 'ht_ctx'.
/// Used during the probe phase of hash joins.
- Iterator IR_ALWAYS_INLINE FindProbeRow(const HashTableCtx* ht_ctx, uint32_t
hash);
+ Iterator IR_ALWAYS_INLINE FindProbeRow(HashTableCtx* ht_ctx, uint32_t hash);
/// If a match is found in the table, return an iterator as in
FindProbeRow(). If a
/// match was not present, return an iterator pointing to the empty bucket
where the key
/// should be inserted. Returns End() if the table is full. The caller can
set the data
/// in the bucket using a Set*() method on the iterator.
- Iterator IR_ALWAYS_INLINE FindBuildRowBucket(const HashTableCtx* ht_ctx,
uint32_t hash,
+ Iterator IR_ALWAYS_INLINE FindBuildRowBucket(HashTableCtx* ht_ctx, uint32_t
hash,
bool* found);
/// Returns number of elements inserted in the hash table
@@ -484,9 +485,11 @@ class HashTable {
static const int64_t BUCKET_NOT_FOUND = -1;
public:
-
- ALWAYS_INLINE
- Iterator() : table_(NULL), row_(NULL), bucket_idx_(BUCKET_NOT_FOUND),
node_(NULL) { }
+ IR_ALWAYS_INLINE Iterator() :
+ table_(NULL),
+ scratch_row_(NULL),
+ bucket_idx_(BUCKET_NOT_FOUND),
+ node_(NULL) { }
/// Iterates to the next element. It should be called only if !AtEnd().
void IR_ALWAYS_INLINE Next();
@@ -534,13 +537,15 @@ class HashTable {
ALWAYS_INLINE
Iterator(HashTable* table, TupleRow* row, int bucket_idx, DuplicateNode*
node)
: table_(table),
- row_(row),
+ scratch_row_(row),
bucket_idx_(bucket_idx),
node_(node) {
}
HashTable* table_;
- TupleRow* row_;
+
+ /// Scratch buffer to hold generated rows. Not owned.
+ TupleRow* scratch_row_;
/// Current bucket idx.
/// TODO: Use uint32_t?
@@ -573,20 +578,29 @@ class HashTable {
/// distance was traveled without finding either an empty or a matching
bucket.
/// Using the returned index value, the caller can create an iterator that
can be
/// iterated until End() to find all the matching rows.
- /// EvalAndHashBuild() or EvalAndHashProb(e) must have been called before
calling this.
+ ///
+ /// If 'row' is not NULL, 'row' will be evaluated once against either the
build or
+ /// probe exprs (determined by the parameter 'is_build') before calling
Equals().
+ /// If 'row' is NULL, EvalAndHashBuild() or EvalAndHashProbe() must have
been called
+ /// before calling this function.
+ ///
/// 'FORCE_NULL_EQUALITY' is true if NULLs should always be considered equal
when
/// comparing two rows.
- /// 'hash' must be the hash returned by these functions.
+ ///
+ /// 'is_build' indicates which of build or probe exprs is used for lazy
evaluation.
+ /// 'row' is the row being probed against the hash table. Used for lazy
evaluation.
+ /// 'hash' is the hash computed by EvalAndHashBuild() or EvalAndHashProbe().
/// 'found' indicates that a bucket that contains an equal row is found.
///
/// There are wrappers of this function that perform the Find and Insert
logic.
template <bool FORCE_NULL_EQUALITY>
- int64_t IR_ALWAYS_INLINE Probe(Bucket* buckets, int64_t num_buckets,
- const HashTableCtx* ht_ctx, uint32_t hash, bool* found);
+ int64_t IR_ALWAYS_INLINE Probe(Bucket* buckets, int64_t num_buckets, bool
is_build,
+ HashTableCtx* ht_ctx, TupleRow* row, uint32_t hash, bool* found);
/// Performs the insert logic. Returns the HtData* of the bucket or
duplicate node
/// where the data should be inserted. Returns NULL if the insert was not
successful.
- HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx, uint32_t hash);
+ HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx, TupleRow* row,
+ uint32_t hash);
/// Updates 'bucket_idx' to the index of the next non-empty bucket. If the
bucket has
/// duplicates, 'node' will be pointing to the head of the linked list of
duplicates.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e32720e0/be/src/exec/hash-table.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h
index bd8ea9d..0d4b1b6 100644
--- a/be/src/exec/hash-table.inline.h
+++ b/be/src/exec/hash-table.inline.h
@@ -23,14 +23,14 @@
namespace impala {
-inline bool HashTableCtx::EvalAndHashBuild(TupleRow* row, uint32_t* hash)
const {
+inline bool HashTableCtx::EvalAndHashBuild(TupleRow* row, uint32_t* hash) {
bool has_null = EvalBuildRow(row);
if (!stores_nulls_ && has_null) return false;
*hash = HashCurrentRow();
return true;
}
-inline bool HashTableCtx::EvalAndHashProbe(TupleRow* row, uint32_t* hash)
const {
+inline bool HashTableCtx::EvalAndHashProbe(TupleRow* row, uint32_t* hash) {
bool has_null = EvalProbeRow(row);
if (has_null && !(stores_nulls_ && finds_some_nulls_)) return false;
*hash = HashCurrentRow();
@@ -39,7 +39,7 @@ inline bool HashTableCtx::EvalAndHashProbe(TupleRow* row,
uint32_t* hash) const
template <bool FORCE_NULL_EQUALITY>
inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets,
- const HashTableCtx* ht_ctx, uint32_t hash, bool* found) {
+ bool is_build, HashTableCtx* ht_ctx, TupleRow* row, uint32_t hash, bool*
found) {
DCHECK(buckets != NULL);
DCHECK_GT(num_buckets, 0);
*found = false;
@@ -49,12 +49,22 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t
num_buckets,
// for knowing when to exit the loop (e.g. by capping the total travel
length). In case
// of quadratic probing it is also used for calculating the length of the
next jump.
int64_t step = 0;
+ bool need_eval = row != NULL;
do {
Bucket* bucket = &buckets[bucket_idx];
- if (!bucket->filled) return bucket_idx;
+ if (LIKELY(!bucket->filled)) return bucket_idx;
if (hash == bucket->hash) {
+ // Evaluate 'row' if needed before calling Equals() for the first time
in this loop.
+ if (need_eval) {
+ if (is_build) {
+ ht_ctx->EvalBuildRow(row);
+ } else {
+ ht_ctx->EvalProbeRow(row);
+ }
+ need_eval = false;
+ }
if (ht_ctx != NULL &&
- ht_ctx->Equals<FORCE_NULL_EQUALITY>(GetRow(bucket, ht_ctx->row_))) {
+ ht_ctx->Equals<FORCE_NULL_EQUALITY>(GetRow(bucket,
ht_ctx->scratch_row_))) {
*found = true;
return bucket_idx;
}
@@ -80,10 +90,11 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t
num_buckets,
}
inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx,
- uint32_t hash) {
+ TupleRow* row, uint32_t hash) {
++num_probes_;
bool found = false;
- int64_t bucket_idx = Probe<true>(buckets_, num_buckets_, ht_ctx, hash,
&found);
+ int64_t bucket_idx =
+ Probe<true>(buckets_, num_buckets_, true, ht_ctx, row, hash, &found);
DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND);
if (found) {
// We need to insert a duplicate node, note that this may fail to allocate
memory.
@@ -98,59 +109,65 @@ inline HashTable::HtData*
HashTable::InsertInternal(HashTableCtx* ht_ctx,
inline bool HashTable::Insert(HashTableCtx* ht_ctx,
const BufferedTupleStream::RowIdx& idx, TupleRow* row, uint32_t hash) {
- if (stores_tuples_) return Insert(ht_ctx, row->GetTuple(0), hash);
- HtData* htdata = InsertInternal(ht_ctx, hash);
+ HtData* htdata = InsertInternal(ht_ctx, row, hash);
// If successful insert, update the contents of the newly inserted entry
with 'idx'.
if (LIKELY(htdata != NULL)) {
- htdata->idx = idx;
+ if (stores_tuples_) {
+ htdata->tuple = row->GetTuple(0);
+ } else {
+ htdata->idx = idx;
+ }
return true;
}
return false;
}
-inline bool HashTable::Insert(HashTableCtx* ht_ctx, Tuple* tuple, uint32_t
hash) {
- DCHECK(stores_tuples_);
- HtData* htdata = InsertInternal(ht_ctx, hash);
- // If successful insert, update the contents of the newly inserted entry
with 'tuple'.
- if (LIKELY(htdata != NULL)) {
- htdata->tuple = tuple;
- return true;
- }
- return false;
+template<const bool READ>
+inline void HashTable::PrefetchBucket(uint32_t hash) {
+ int64_t bucket_idx = hash & (num_buckets_ - 1);
+ // Two optional arguments:
+ // 'rw': 1 means the memory access is write
+ // 'locality': 0-3. 0 means no temporal locality. 3 means high temporal
locality.
+ // On x86, they map to instructions prefetchnta and prefetch{2-0}
respectively.
+ // TODO: Reconsider the locality level with smaller prefetch batch size.
+ __builtin_prefetch(&buckets_[bucket_idx], READ ? 0 : 1, 1);
}
-inline HashTable::Iterator HashTable::FindProbeRow(const HashTableCtx* ht_ctx,
uint32_t hash) {
+inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* ht_ctx,
uint32_t hash) {
++num_probes_;
bool found = false;
- int64_t bucket_idx = Probe<false>(buckets_, num_buckets_, ht_ctx, hash,
&found);
+ int64_t bucket_idx =
+ Probe<false>(buckets_, num_buckets_, false, ht_ctx, NULL, hash, &found);
if (found) {
- return Iterator(this, ht_ctx->row(), bucket_idx,
+ return Iterator(this, ht_ctx->scratch_row(), bucket_idx,
buckets_[bucket_idx].bucketData.duplicates);
}
return End();
}
+// TODO: support lazy evaluation like HashTable::Insert().
inline HashTable::Iterator HashTable::FindBuildRowBucket(
- const HashTableCtx* ht_ctx, uint32_t hash, bool* found) {
+ HashTableCtx* ht_ctx, uint32_t hash, bool* found) {
++num_probes_;
- int64_t bucket_idx = Probe<true>(buckets_, num_buckets_, ht_ctx, hash,
found);
+ int64_t bucket_idx =
+ Probe<true>(buckets_, num_buckets_, false, ht_ctx, NULL, hash, found);
DuplicateNode* duplicates = LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND)
?
buckets_[bucket_idx].bucketData.duplicates : NULL;
- return Iterator(this, ht_ctx->row(), bucket_idx, duplicates);
+ return Iterator(this, ht_ctx->scratch_row(), bucket_idx, duplicates);
}
inline HashTable::Iterator HashTable::Begin(const HashTableCtx* ctx) {
int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND;
DuplicateNode* node = NULL;
NextFilledBucket(&bucket_idx, &node);
- return Iterator(this, ctx->row(), bucket_idx, node);
+ return Iterator(this, ctx->scratch_row(), bucket_idx, node);
}
inline HashTable::Iterator HashTable::FirstUnmatched(HashTableCtx* ctx) {
int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND;
DuplicateNode* node = NULL;
NextFilledBucket(&bucket_idx, &node);
- Iterator it(this, ctx->row(), bucket_idx, node);
+ Iterator it(this, ctx->scratch_row(), bucket_idx, node);
// Check whether the bucket, or its first duplicate node, is matched. If it
is not
// matched, then return. Otherwise, move to the first unmatched entry (node
or bucket).
Bucket* bucket = &buckets_[bucket_idx];
@@ -244,13 +261,13 @@ inline TupleRow* HashTable::GetRow(Bucket* bucket,
TupleRow* row) const {
inline TupleRow* HashTable::Iterator::GetRow() const {
DCHECK(!AtEnd());
DCHECK(table_ != NULL);
- DCHECK(row_ != NULL);
+ DCHECK(scratch_row_ != NULL);
Bucket* bucket = &table_->buckets_[bucket_idx_];
if (UNLIKELY(bucket->hasDuplicates)) {
DCHECK(node_ != NULL);
- return table_->GetRow(node_->htdata, row_);
+ return table_->GetRow(node_->htdata, scratch_row_);
} else {
- return table_->GetRow(bucket->bucketData.htdata, row_);
+ return table_->GetRow(bucket->bucketData.htdata, scratch_row_);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e32720e0/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc
b/be/src/exec/partitioned-aggregation-node-ir.cc
index c7a6de9..45b5a0e 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -26,15 +26,15 @@ using namespace impala;
Status PartitionedAggregationNode::ProcessBatchNoGrouping(RowBatch* batch,
const HashTableCtx* ht_ctx) { // 'ht_ctx' is unused
Tuple* output_tuple = singleton_output_tuple_;
- FOREACH_ROW(batch, 0, row) {
- UpdateTuple(&agg_fn_ctxs_[0], output_tuple, row);
+ FOREACH_ROW(batch, 0, batch_iter) {
+ UpdateTuple(&agg_fn_ctxs_[0], output_tuple, batch_iter.Get());
}
return Status::OK();
}
template<bool AGGREGATED_ROWS>
Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
- const HashTableCtx* __restrict__ ht_ctx) {
+ HashTableCtx* __restrict__ ht_ctx) {
DCHECK(!hash_partitions_.empty());
DCHECK(!is_streaming_preagg_);
@@ -44,15 +44,15 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch*
batch,
// TODO: Once we have a histogram with the number of rows per partition, we
will have
// accurate resize calls.
RETURN_IF_ERROR(CheckAndResizeHashPartitions(batch->num_rows(), ht_ctx));
- FOREACH_ROW(batch, 0, row) {
- RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(row, ht_ctx));
+ FOREACH_ROW(batch, 0, batch_iter) {
+ RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx));
}
return Status::OK();
}
template<bool AGGREGATED_ROWS>
Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
- const HashTableCtx* __restrict__ ht_ctx) {
+ HashTableCtx* __restrict__ ht_ctx) {
uint32_t hash = 0;
if (AGGREGATED_ROWS) {
if (!ht_ctx->EvalAndHashBuild(row, &hash)) return Status::OK();
@@ -148,8 +148,9 @@ Status
PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
DCHECK_LE(in_batch->num_rows(), out_batch->capacity());
RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
- FOREACH_ROW(in_batch, 0, in_row) {
+ FOREACH_ROW(in_batch, 0, in_batch_iter) {
uint32_t hash;
+ TupleRow* in_row = in_batch_iter.Get();
if (!ht_ctx->EvalAndHashProbe(in_row, &hash)) continue;
const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
@@ -174,9 +175,9 @@ Status
PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
}
if (needs_serialize) {
- FOREACH_ROW(out_batch, 0, serialize_row) {
+ FOREACH_ROW(out_batch, 0, out_batch_iter) {
AggFnEvaluator::Serialize(aggregate_evaluators_, agg_fn_ctxs_,
- serialize_row->GetTuple(0));
+ out_batch_iter.Get()->GetTuple(0));
}
}
@@ -184,12 +185,13 @@ Status
PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
}
bool PartitionedAggregationNode::TryAddToHashTable(
- const HashTableCtx* __restrict__ ht_ctx, Partition* __restrict__ partition,
+ HashTableCtx* __restrict__ ht_ctx, Partition* __restrict__ partition,
TupleRow* __restrict__ in_row, uint32_t hash, int* __restrict__
remaining_capacity,
Status* status) {
DCHECK(remaining_capacity != NULL);
DCHECK_GE(*remaining_capacity, 0);
bool found;
+ // This is called from ProcessBatchStreaming() so the rows are not
aggregated.
HashTable::Iterator it = partition->hash_tbl->FindBuildRowBucket(ht_ctx,
hash, &found);
Tuple* intermediate_tuple;
if (found) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e32720e0/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h
b/be/src/exec/partitioned-aggregation-node.h
index 76f5b51..0b94511 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -463,13 +463,13 @@ class PartitionedAggregationNode : public ExecNode {
/// This function is replaced by codegen. It's inlined into
ProcessBatch_true/false in
/// the IR module. We pass in ht_ctx_.get() as an argument for performance.
template<bool AGGREGATED_ROWS>
- Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, const HashTableCtx*
ht_ctx);
+ Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, HashTableCtx* ht_ctx);
/// This function processes each individual row in ProcessBatch(). Must be
inlined into
/// ProcessBatch for codegen to substitute function calls with codegen'd
versions.
/// May spill partitions if not enough memory is available.
template<bool AGGREGATED_ROWS>
- Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, const HashTableCtx*
ht_ctx);
+ Status IR_ALWAYS_INLINE ProcessRow(TupleRow* row, HashTableCtx* ht_ctx);
/// Create a new intermediate tuple in partition, initialized with row.
ht_ctx is
/// the context for the partition's hash table and hash is the precomputed
hash of
@@ -534,7 +534,7 @@ class PartitionedAggregationNode : public ExecNode {
/// of how many more entries can be added to the hash table so we can avoid
retrying
/// inserts. It is decremented if an insert succeeds and set to zero if an
insert
/// fails. If an error occurs, returns false and sets 'status'.
- bool IR_ALWAYS_INLINE TryAddToHashTable(const HashTableCtx* ht_ctx,
+ bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx,
Partition* partition, TupleRow* in_row, uint32_t hash, int*
remaining_capacity,
Status* status);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e32720e0/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc
b/be/src/exec/partitioned-hash-join-node-ir.cc
index 354003e..ef4c010 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -220,7 +220,7 @@ bool IR_ALWAYS_INLINE
PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
template<int const JoinOp>
bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
- const HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
+ HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
int* remaining_capacity, int num_other_join_conjuncts, Status* status) {
while (!probe_batch_iterator->AtEnd()) {
// Establish current_probe_row_ and find its corresponding partition.
@@ -279,7 +279,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
// codegen.
template<int const JoinOp>
int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
- const HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
+ HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
@@ -348,7 +348,7 @@ int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch*
out_batch,
int PartitionedHashJoinNode::ProcessProbeBatch(
const TJoinOp::type join_op, RowBatch* out_batch,
- const HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
+ HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
switch (join_op) {
case TJoinOp::INNER_JOIN:
return ProcessProbeBatch<TJoinOp::INNER_JOIN>(out_batch, ht_ctx, status);
@@ -377,9 +377,10 @@ int PartitionedHashJoinNode::ProcessProbeBatch(
Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
bool build_filters) {
- FOREACH_ROW(build_batch, 0, build_row) {
+ FOREACH_ROW(build_batch, 0, build_batch_iter) {
DCHECK(build_status_.ok());
uint32_t hash;
+ TupleRow* build_row = build_batch_iter.Get();
if (!ht_ctx_->EvalAndHashBuild(build_row, &hash)) {
if (null_aware_partition_ != NULL) {
// TODO: remove with codegen/template
@@ -412,14 +413,35 @@ Status
PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
return Status::OK();
}
-bool PartitionedHashJoinNode::Partition::InsertBatch(HashTableCtx* ctx,
RowBatch* batch,
- const vector<BufferedTupleStream::RowIdx>& indices) {
- int num_rows = batch->num_rows();
- for (int i = 0; i < num_rows; ++i) {
- TupleRow* row = batch->GetRow(i);
- uint32_t hash = 0;
- if (!ctx->EvalAndHashBuild(row, &hash)) continue;
- if (UNLIKELY(!hash_tbl_->Insert(ctx, indices[i], row, hash))) return false;
+bool PartitionedHashJoinNode::Partition::InsertBatch(HashTableCtx* ht_ctx,
+ RowBatch* batch, const vector<BufferedTupleStream::RowIdx>& indices) {
+ DCHECK_LE(batch->num_rows(), hash_values_.size());
+ DCHECK_LE(batch->num_rows(), null_bitmap_.num_bits());
+ // Compute the hash values and prefetch the hash table buckets.
+ int i = 0;
+ uint32_t* hash_values = hash_values_.data();
+ null_bitmap_.SetAllBits(false);
+ FOREACH_ROW(batch, 0, batch_iter) {
+ if (ht_ctx->EvalAndHashBuild(batch_iter.Get(), &hash_values[i])) {
+ // TODO: Find the optimal prefetch batch size. This may be something
+ // processor dependent so we may need calibration at Impala startup time.
+ hash_tbl_->PrefetchBucket<false>(hash_values[i]);
+ } else {
+ null_bitmap_.Set<false>(i, true);
+ }
+ ++i;
+ }
+ // Do the insertion.
+ i = 0;
+ const BufferedTupleStream::RowIdx* row_idx = indices.data();
+ FOREACH_ROW(batch, 0, batch_iter) {
+ if (LIKELY(!null_bitmap_.Get<false>(i))) {
+ TupleRow* row = batch_iter.Get();
+ if (UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx[i], row,
hash_values[i]))) {
+ return false;
+ }
+ }
+ ++i;
}
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e32720e0/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc
b/be/src/exec/partitioned-hash-join-node.cc
index 39fa1b3..1a21ecc 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -76,7 +76,6 @@ PartitionedHashJoinNode::~PartitionedHashJoinNode() {
DCHECK(null_probe_rows_ == NULL);
}
-
Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(BlockingJoinNode::Init(tnode, state));
DCHECK(tnode.__isset.hash_join_node);
@@ -321,11 +320,12 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
}
PartitionedHashJoinNode::Partition::Partition(RuntimeState* state,
- PartitionedHashJoinNode* parent, int level)
+ PartitionedHashJoinNode* parent, int level)
: parent_(parent),
is_closed_(false),
is_spilled_(false),
- level_(level) {
+ level_(level),
+ null_bitmap_(state->batch_size()) {
build_rows_ = new BufferedTupleStream(state, parent_->child(1)->row_desc(),
state->block_mgr(), parent_->block_mgr_client_,
true /* use_initial_small_buffers */, false /* read_write */);
@@ -334,6 +334,8 @@ PartitionedHashJoinNode::Partition::Partition(RuntimeState*
state,
state->block_mgr(), parent_->block_mgr_client_,
true /* use_initial_small_buffers */, false /* read_write */ );
DCHECK(probe_rows_ != NULL);
+ hash_values_.resize(state->batch_size());
+ null_bitmap_.SetAllBits(false);
}
PartitionedHashJoinNode::Partition::~Partition() {
@@ -465,16 +467,16 @@ Status
PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state,
<< build_rows()->RowConsumesMemory();
SCOPED_TIMER(parent_->build_timer_);
if (parent_->insert_batch_fn_ != NULL) {
- DCHECK(parent_->insert_batch_fn_level0_ != NULL);
- InsertBatchFn insert_batch_fn = NULL;
+ InsertBatchFn insert_batch_fn;
if (ctx->level() == 0) {
insert_batch_fn = parent_->insert_batch_fn_level0_;
} else {
insert_batch_fn = parent_->insert_batch_fn_;
}
- if (!insert_batch_fn(this, ctx, &batch, indices)) goto not_built;
+ DCHECK(insert_batch_fn != NULL);
+ if (UNLIKELY(!insert_batch_fn(this, ctx, &batch, indices))) goto
not_built;
} else {
- if (!InsertBatch(ctx, &batch, indices)) goto not_built;
+ if (UNLIKELY(!InsertBatch(ctx, &batch, indices))) goto not_built;
}
RETURN_IF_ERROR(state->GetQueryStatus());
parent_->FreeLocalAllocations();
@@ -1823,15 +1825,11 @@ Status
PartitionedHashJoinNode::CodegenInsertBatch(RuntimeState* state,
// Use codegen'd EvalBuildRow() function
int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn,
"EvalBuildRow");
- DCHECK_EQ(replaced, 1);
+ DCHECK_EQ(replaced, 2);
// Use codegen'd Equals() function
replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn,
"Equals");
- // There are two Equals() calls because the HashTable::Insert() method that
takes a
- // RowIdx parameter either calls the other Insert() method or
InsertInternal()
- // directly. This generates two InsertInternal() calls in the IR => two
Probe() calls =>
- // two Equals() calls after inlining. Only one will actually be called.
- DCHECK_EQ(replaced, 2);
+ DCHECK_EQ(replaced, 1);
Function* insert_batch_fn_level0 = codegen->CloneFunction(insert_batch_fn);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e32720e0/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h
b/be/src/exec/partitioned-hash-join-node.h
index a51a15a..37c2c4f 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -245,7 +245,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// append to the spilled partitions' BTS or null probe rows' BTS fail.
template<int const JoinOp>
bool inline NextProbeRow(
- const HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
+ HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
int* remaining_capacity, int num_other_join_conjuncts, Status* status);
/// Process probe rows from probe_batch_. Returns either if out_batch is
full or
@@ -256,11 +256,11 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// set). This function doesn't commit rows to the output batch so it's the
caller's
/// responsibility to do so.
template<int const JoinOp>
- int ProcessProbeBatch(RowBatch* out_batch, const HashTableCtx* ht_ctx,
Status* status);
+ int ProcessProbeBatch(RowBatch* out_batch, HashTableCtx* ht_ctx, Status*
status);
/// Wrapper that calls the templated version of ProcessProbeBatch() based on
'join_op'.
int ProcessProbeBatch(const TJoinOp::type join_op, RowBatch* out_batch,
- const HashTableCtx* ht_ctx, Status* status);
+ HashTableCtx* ht_ctx, Status* status);
/// Sweep the hash_tbl_ of the partition that is at the front of
/// output_build_partitions_, using hash_tbl_iterator_ and output any
unmatched build
@@ -551,9 +551,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
private:
friend class PartitionedHashJoinNode;
- /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices'
contains the
- /// index of each row's index into the hash table's tuple stream. This
function is
- /// replaced with a codegen'd version.
+ /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is
an array
+ /// containing the index of each row's index into the hash table's tuple
stream.
+ /// This function is replaced with a codegen'd version.
bool InsertBatch(HashTableCtx* ctx, RowBatch* batch,
const std::vector<BufferedTupleStream::RowIdx>& indices);
@@ -581,6 +581,13 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// If NULL, ownership has been transfered.
BufferedTupleStream* build_rows_;
BufferedTupleStream* probe_rows_;
+
+ /// Store hash values of each row for the current batch computed during
prefetching.
+ std::vector<uint32_t> hash_values_;
+
+ /// Bitmap to indicate rows evaluated to NULL for the current batch when
building
+ /// hash tables.
+ Bitmap null_bitmap_;
};
/// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when
available
@@ -600,7 +607,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
ProcessProbeBatchFn process_probe_batch_fn_level0_;
typedef bool (*InsertBatchFn)(Partition*, HashTableCtx*, RowBatch*,
- const vector<BufferedTupleStream::RowIdx>&);
+ const std::vector<BufferedTupleStream::RowIdx>&);
/// Jitted Partition::InsertBatch() function pointers. NULL if codegen is
disabled.
InsertBatchFn insert_batch_fn_;
InsertBatchFn insert_batch_fn_level0_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e32720e0/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 33fa482..aac0043 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -400,10 +400,9 @@ class RowBatch {
/// Macro for iterating through '_row_batch', starting at '_start_row_idx'.
/// '_row_batch' is the row batch to iterate through.
/// '_start_row_idx' is the starting row index.
-/// '_row' is the current row which the iterator is pointing to.
-#define FOREACH_ROW(_row_batch, _start_row_idx, _row)
\
- RowBatch::Iterator _##_row_batch##_iter(_row_batch, _start_row_idx);
\
- for (TupleRow* _row = _##_row_batch##_iter.Get();
!_##_row_batch##_iter.AtEnd(); \
- _row = _##_row_batch##_iter.Next())
+/// '_iter' is the iterator.
+#define FOREACH_ROW(_row_batch, _start_row_idx, _iter) \
+ for (RowBatch::Iterator _iter(_row_batch, _start_row_idx); \
+ !_iter.AtEnd(); _iter.Next())
#endif