IMPALA-4231: fix codegen time regression

The commit "IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder"
slightly increased codegen time, which caused TPC-H Q2 to sometimes
regress significantly because of races in runtime filter arrival.

This patch attempts to fix the regression by improving codegen time in a
few places.

* Revert to using the old bool/Status return pattern. The regular Status
  return pattern results in significantly more complex IR because it has
  to emit code to copy and free statuses. I spent some time trying to
  convince it to optimise the extra code out, but didn't have much success.
* Remove some code that cannot be specialized from cross-compilation.
* Add noexcept to some functions that are used from the IR to ensure
  exception-handling IR is not emitted. This is less important after the
  first change but still should help produce cleaner IR.

Performance:
I was able to reproduce a regression locally, which is fixed by this
patch. I'm in the process of trying to verify the fix on a cluster.

Change-Id: Idf0fdedabd488550b6db90167a30c582949d608d
Reviewed-on: http://gerrit.cloudera.org:8080/4623
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c7fe4385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c7fe4385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c7fe4385

Branch: refs/heads/hadoop-next
Commit: c7fe4385d927509443a1c4e2c6e9a802d2dcf63b
Parents: 89b41c6
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Fri Sep 30 15:18:54 2016 -0700
Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org>
Committed: Fri Oct 14 02:53:59 2016 +0000

----------------------------------------------------------------------
 be/src/common/status.h                          |  30 +--
 be/src/exec/hash-table.cc                       |   8 +-
 be/src/exec/hash-table.h                        |  16 +-
 be/src/exec/partitioned-aggregation-node-ir.cc  |   6 +-
 be/src/exec/partitioned-aggregation-node.cc     |  10 +-
 be/src/exec/partitioned-aggregation-node.h      |   9 +-
 be/src/exec/partitioned-hash-join-builder-ir.cc |  20 +-
 be/src/exec/partitioned-hash-join-builder.cc    |  19 +-
 be/src/exec/partitioned-hash-join-builder.h     |  14 +-
 be/src/exec/partitioned-hash-join-node-ir.cc    |  33 ++--
 be/src/exec/partitioned-hash-join-node.h        |   9 +-
 be/src/runtime/buffered-tuple-stream.cc         |  21 ++-
 be/src/runtime/buffered-tuple-stream.h          |  31 +--
 be/src/runtime/buffered-tuple-stream.inline.h   |   8 +-
 be/src/runtime/raw-value.cc                     | 169 +++++++++++++++++
 be/src/runtime/raw-value.h                      |   7 +-
 be/src/runtime/raw-value.inline.h               | 188 -------------------
 be/src/util/bloom-filter.cc                     |  72 +++++++
 be/src/util/bloom-filter.h                      |  80 +-------
 19 files changed, 382 insertions(+), 368 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 35a9a94..9a9ea67 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -99,6 +99,10 @@ class Status {
     if (UNLIKELY(status.msg_ != NULL)) CopyMessageFrom(status);
   }
 
+  /// Move constructor that moves the error message (if any) and resets 
'other' to the
+  /// default OK Status.
+  ALWAYS_INLINE Status(Status&& other) : msg_(other.msg_) { other.msg_ = NULL; 
}
+
   /// Status using only the error code as a parameter. This can be used for 
error messages
   /// that don't take format parameters.
   Status(TErrorCode::type code);
@@ -153,6 +157,15 @@ class Status {
     return *this;
   }
 
+  /// Move assignment that moves the error message (if any) and resets 'other' 
to the
+  /// default OK Status.
+  ALWAYS_INLINE Status& operator=(Status&& other) {
+    if (UNLIKELY(msg_ != NULL)) FreeMessage();
+    msg_ = other.msg_;
+    other.msg_ = NULL;
+    return *this;
+  }
+
   ALWAYS_INLINE ~Status() {
     // The UNLIKELY and inlining here are important hints for the compiler to
     // streamline the common case of Status::OK(). Use FreeMessage() which is
@@ -244,21 +257,12 @@ class Status {
 };
 
 /// some generally useful macros
-#define RETURN_IF_ERROR(stmt) \
-  do { \
-    Status __status__ = (stmt); \
-    if (UNLIKELY(!__status__.ok())) return __status__; \
+#define RETURN_IF_ERROR(stmt)                                     \
+  do {                                                            \
+    Status __status__ = (stmt);                                   \
+    if (UNLIKELY(!__status__.ok())) return std::move(__status__); \
   } while (false)
 
-#define RETURN_IF_ERROR_PREPEND(expr, prepend) \
-  do { \
-    Status __status__ = (stmt); \
-    if (UNLIKELY(!__status__.ok())) { \
-      return Status(strings::Substitute("$0: $1", prepend, 
__status__.GetDetail())); \
-    } \
-  } while (false)
-
-
 #define ABORT_IF_ERROR(stmt) \
   do { \
     Status __status__ = (stmt); \

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 0d780b9..6626b33 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -149,7 +149,7 @@ uint32_t HashTableCtx::Hash(const void* input, int len, 
uint32_t hash) const {
 }
 
 uint32_t HashTableCtx::HashRow(
-    const uint8_t* expr_values, const uint8_t* expr_values_null) const {
+    const uint8_t* expr_values, const uint8_t* expr_values_null) const 
noexcept {
   DCHECK_LT(level_, seeds_.size());
   if (expr_values_cache_.var_result_offset() == -1) {
     /// This handles NULLs implicitly since a constant seed value was put
@@ -162,7 +162,7 @@ uint32_t HashTableCtx::HashRow(
 }
 
 bool HashTableCtx::EvalRow(const TupleRow* row, const vector<ExprContext*>& 
ctxs,
-    uint8_t* expr_values, uint8_t* expr_values_null) {
+    uint8_t* expr_values, uint8_t* expr_values_null) noexcept {
   bool has_null = false;
   for (int i = 0; i < ctxs.size(); ++i) {
     void* loc = expr_values_cache_.ExprValuePtr(expr_values, i);
@@ -213,7 +213,7 @@ uint32_t HashTableCtx::HashVariableLenRow(const uint8_t* 
expr_values,
 
 template <bool FORCE_NULL_EQUALITY>
 bool HashTableCtx::Equals(const TupleRow* build_row, const uint8_t* 
expr_values,
-    const uint8_t* expr_values_null) const {
+    const uint8_t* expr_values_null) const noexcept {
   for (int i = 0; i < build_expr_ctxs_.size(); ++i) {
     void* val = build_expr_ctxs_[i]->GetValue(build_row);
     if (val == NULL) {
@@ -331,7 +331,7 @@ void HashTableCtx::ExprValuesCache::ResetIterators() {
   cur_expr_values_hash_ = expr_values_hash_array_.get();
 }
 
-void HashTableCtx::ExprValuesCache::Reset() {
+void HashTableCtx::ExprValuesCache::Reset() noexcept {
   ResetIterators();
   // Set the end pointer after resetting the other pointers so they point to
   // the same location.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index fead1f7..4edd130 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -256,7 +256,7 @@ class HashTableCtx {
     void Close(MemTracker* tracker);
 
     /// Resets the cache states (iterators, end pointers etc) before writing.
-    void Reset();
+    void Reset() noexcept;
 
     /// Resets the iterators to the start before reading. Will record the 
current position
     /// of the iterators in end pointer before resetting so AtEnd() can 
determine if all
@@ -406,7 +406,7 @@ class HashTableCtx {
   /// This will be replaced by codegen.  We don't want this inlined for 
replacing
   /// with codegen'd functions so the function name does not change.
   uint32_t IR_NO_INLINE HashRow(
-      const uint8_t* expr_values, const uint8_t* expr_values_null) const;
+      const uint8_t* expr_values, const uint8_t* expr_values_null) const 
noexcept;
 
   /// Wrapper function for calling correct HashUtil function in non-codegen'd 
case.
   uint32_t Hash(const void* input, int len, uint32_t hash) const;
@@ -416,15 +416,15 @@ class HashTableCtx {
   /// inlined when cross compiled because we need to be able to differentiate 
between
   /// EvalBuildRow and EvalProbeRow by name and the build/probe exprs are 
baked into the
   /// codegen'd function.
-  bool IR_NO_INLINE EvalBuildRow(const TupleRow* row, uint8_t* expr_values,
-      uint8_t* expr_values_null) {
+  bool IR_NO_INLINE EvalBuildRow(
+      const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) 
noexcept {
     return EvalRow(row, build_expr_ctxs_, expr_values, expr_values_null);
   }
 
   /// Evaluate 'row' over probe exprs, storing the values into 'expr_values' 
and nullness
   /// into 'expr_values_null'. This will be replaced by codegen.
-  bool IR_NO_INLINE EvalProbeRow(const TupleRow* row, uint8_t* expr_values,
-      uint8_t* expr_values_null) {
+  bool IR_NO_INLINE EvalProbeRow(
+      const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) 
noexcept {
     return EvalRow(row, probe_expr_ctxs_, expr_values, expr_values_null);
   }
 
@@ -437,7 +437,7 @@ class HashTableCtx {
   /// 'expr_values_null'. Returns whether any expr evaluated to NULL. This 
will be
   /// replaced by codegen.
   bool EvalRow(const TupleRow* row, const std::vector<ExprContext*>& ctxs,
-      uint8_t* expr_values, uint8_t* expr_values_null);
+      uint8_t* expr_values, uint8_t* expr_values_null) noexcept;
 
   /// Returns true if the values of build_exprs evaluated over 'build_row' 
equal the
   /// values in 'expr_values' with nullness 'expr_values_null'. 
FORCE_NULL_EQUALITY is
@@ -445,7 +445,7 @@ class HashTableCtx {
   /// 'finds_nulls_'. This will be replaced by codegen.
   template <bool FORCE_NULL_EQUALITY>
   bool IR_NO_INLINE Equals(const TupleRow* build_row, const uint8_t* 
expr_values,
-      const uint8_t* expr_values_null) const;
+      const uint8_t* expr_values_null) const noexcept;
 
   /// Helper function that calls Equals() with the current row. Always inlined 
so that
   /// it does not appear in cross-compiled IR.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc 
b/be/src/exec/partitioned-aggregation-node-ir.cc
index ed95844..83362e7 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -153,7 +153,7 @@ Status 
PartitionedAggregationNode::AddIntermediateTuple(Partition* __restrict__
       insert_it.SetTuple(intermediate_tuple, hash);
       return Status::OK();
     } else if (!process_batch_status_.ok()) {
-      return process_batch_status_;
+      return std::move(process_batch_status_);
     }
 
     // We did not have enough memory to add intermediate_tuple to the stream.
@@ -198,13 +198,13 @@ Status 
PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
           !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx],
             GetHashTable(partition_idx), in_row, hash, 
&remaining_capacity[partition_idx],
             &process_batch_status_)) {
-        RETURN_IF_ERROR(process_batch_status_);
+        RETURN_IF_ERROR(std::move(process_batch_status_));
         // Tuple is not going into hash table, add it to the output batch.
         Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_ctxs_,
             out_batch->tuple_data_pool(), &process_batch_status_);
         if (UNLIKELY(intermediate_tuple == NULL)) {
           DCHECK(!process_batch_status_.ok());
-          return process_batch_status_;
+          return std::move(process_batch_status_);
         }
         UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row);
         out_batch_iterator.Get()->SetTuple(0, intermediate_tuple);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc 
b/be/src/exec/partitioned-aggregation-node.cc
index f926725..629e407 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -965,7 +965,7 @@ Tuple* 
PartitionedAggregationNode::ConstructSingletonOutputTuple(
 }
 
 Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
-    const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool, Status* 
status) {
+    const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool, Status* 
status) noexcept {
   const int fixed_size = intermediate_tuple_desc_->byte_size();
   const int varlen_size = GroupingExprsVarlenSize();
   const int tuple_data_size = fixed_size + varlen_size;
@@ -985,8 +985,8 @@ Tuple* 
PartitionedAggregationNode::ConstructIntermediateTuple(
 }
 
 Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
-    const vector<FunctionContext*>& agg_fn_ctxs,
-    BufferedTupleStream* stream, Status* status) {
+    const vector<FunctionContext*>& agg_fn_ctxs, BufferedTupleStream* stream,
+    Status* status) noexcept {
   DCHECK(stream != NULL && status != NULL);
   // Allocate space for the entire tuple in the stream.
   const int fixed_size = intermediate_tuple_desc_->byte_size();
@@ -1090,8 +1090,8 @@ void PartitionedAggregationNode::InitAggSlots(
   }
 }
 
-void PartitionedAggregationNode::UpdateTuple(FunctionContext** agg_fn_ctxs,
-    Tuple* tuple, TupleRow* row, bool is_merge) {
+void PartitionedAggregationNode::UpdateTuple(
+    FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row, bool is_merge) 
noexcept {
   DCHECK(tuple != NULL || aggregate_evaluators_.empty());
   for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
     if (is_merge) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h 
b/be/src/exec/partitioned-aggregation-node.h
index c766ab2..0c0f3e8 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -444,14 +444,13 @@ class PartitionedAggregationNode : public ExecNode {
   /// full, it will attempt to switch to IO-buffers.
   Tuple* ConstructIntermediateTuple(
       const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
-      BufferedTupleStream* stream, Status* status);
+      BufferedTupleStream* stream, Status* status) noexcept;
 
   /// Constructs intermediate tuple, allocating memory from pool instead of 
the stream.
   /// Returns NULL and sets status if there is not enough memory to allocate 
the tuple.
   Tuple* ConstructIntermediateTuple(
-      const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs,
-      MemPool* pool, Status* status);
-
+      const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, MemPool* 
pool,
+      Status* status) noexcept;
 
   /// Returns the number of bytes of variable-length data for the grouping 
values stored
   /// in 'ht_ctx_'.
@@ -477,7 +476,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// This function is replaced by codegen (which is why we don't use a vector 
argument
   /// for agg_fn_ctxs).. Any var-len data is allocated from the 
FunctionContexts.
   void UpdateTuple(impala_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, 
TupleRow* row,
-      bool is_merge = false);
+      bool is_merge = false) noexcept;
 
   /// Called on the intermediate tuple of each group after all input rows have 
been
   /// consumed and aggregated. Computes the final aggregate values to be 
returned in

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-builder-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc 
b/be/src/exec/partitioned-hash-join-builder-ir.cc
index 21fd9e4..11974e3 100644
--- a/be/src/exec/partitioned-hash-join-builder-ir.cc
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -29,15 +29,16 @@
 
 using namespace impala;
 
-inline Status PhjBuilder::AppendRow(BufferedTupleStream* stream, TupleRow* 
row) {
-  Status status;
-  if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
-  RETURN_IF_ERROR(status);
-  return AppendRowStreamFull(stream, row);
+inline bool PhjBuilder::AppendRow(
+    BufferedTupleStream* stream, TupleRow* row, Status* status) {
+  if (LIKELY(stream->AddRow(row, status))) return true;
+  if (UNLIKELY(!status->ok())) return false;
+  return AppendRowStreamFull(stream, row, status);
 }
 
 Status PhjBuilder::ProcessBuildBatch(
     RowBatch* build_batch, HashTableCtx* ctx, bool build_filters) {
+  Status status;
   HashTableCtx::ExprValuesCache* expr_vals_cache = ctx->expr_values_cache();
   expr_vals_cache->Reset();
   FOREACH_ROW(build_batch, 0, build_batch_iter) {
@@ -47,7 +48,10 @@ Status PhjBuilder::ProcessBuildBatch(
         // TODO: remove with codegen/template
         // If we are NULL aware and this build row has NULL in the eq join 
slot,
         // append it to the null_aware partition. We will need it later.
-        RETURN_IF_ERROR(AppendRow(null_aware_partition_->build_rows(), 
build_row));
+        if (UNLIKELY(
+                !AppendRow(null_aware_partition_->build_rows(), build_row, 
&status))) {
+          return std::move(status);
+        }
       }
       continue;
     }
@@ -66,7 +70,9 @@ Status PhjBuilder::ProcessBuildBatch(
     const uint32_t hash = expr_vals_cache->CurExprValuesHash();
     const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
     Partition* partition = hash_partitions_[partition_idx];
-    RETURN_IF_ERROR(AppendRow(partition->build_rows(), build_row));
+    if (UNLIKELY(!AppendRow(partition->build_rows(), build_row, &status))) {
+      return std::move(status);
+    }
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc 
b/be/src/exec/partitioned-hash-join-builder.cc
index b17bbff..bf5b42a 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -260,23 +260,26 @@ Status PhjBuilder::CreateHashPartitions(int level) {
   return Status::OK();
 }
 
-Status PhjBuilder::AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* 
row) {
-  Status status;
+bool PhjBuilder::AppendRowStreamFull(
+    BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept {
   while (true) {
     // Check if the stream is still using small buffers and try to switch to 
IO-buffers.
     if (stream->using_small_buffers()) {
       bool got_buffer;
-      RETURN_IF_ERROR(stream->SwitchToIoBuffers(&got_buffer));
+      *status = stream->SwitchToIoBuffers(&got_buffer);
+      if (!status->ok()) return false;
+
       if (got_buffer) {
-        if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
-        RETURN_IF_ERROR(status);
+        if (LIKELY(stream->AddRow(row, status))) return true;
+        if (!status->ok()) return false;
       }
     }
     // We ran out of memory. Pick a partition to spill. If we ran out of 
unspilled
     // partitions, SpillPartition() will return an error status.
-    
RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
-    if (stream->AddRow(row, &status)) return Status::OK();
-    RETURN_IF_ERROR(status);
+    *status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+    if (!status->ok()) return false;
+    if (stream->AddRow(row, status)) return true;
+    if (!status->ok()) return false;
     // Spilling one partition does not guarantee we can append a row. Keep
     // spilling until we can append this row.
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h 
b/be/src/exec/partitioned-hash-join-builder.h
index 23822b2..7f81e5a 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -261,14 +261,18 @@ class PhjBuilder : public DataSink {
 
   /// Append 'row' to 'stream'. In the common case, appending the row to the 
stream
   /// immediately succeeds. Otherwise this function falls back to the slower 
path of
-  /// AppendRowStreamFull(), which may spill partitions to free memory. 
Returns an error
-  /// if it was unable to append the row, even after spilling partitions.
-  Status AppendRow(BufferedTupleStream* stream, TupleRow* row);
+  /// AppendRowStreamFull(), which may spill partitions to free memory. 
Returns false
+  /// and sets 'status' if it was unable to append the row, even after spilling
+  /// partitions. This odd return convention is used to avoid emitting 
unnecessary code
+  /// for ~Status in perf-critical code.
+  bool AppendRow(BufferedTupleStream* stream, TupleRow* row, Status* status);
 
   /// Slow path for AppendRow() above. It is called when the stream has failed 
to append
   /// the row. We need to find more memory by either switching to IO-buffers, 
in case the
-  /// stream still uses small buffers, or spilling a partition.
-  Status AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row);
+  /// stream still uses small buffers, or spilling a partition. Returns false 
and sets
+  /// 'status' if it was unable to append the row, even after spilling 
partitions.
+  bool AppendRowStreamFull(
+      BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept;
 
   /// Frees memory by spilling one of the hash partitions. The 'mode' argument 
is passed
   /// to the Spill() call for the selected partition. The current policy is to 
spill the

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc 
b/be/src/exec/partitioned-hash-join-node-ir.cc
index 44cb14b..bbed06d 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -149,10 +149,11 @@ bool IR_ALWAYS_INLINE 
PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins(
       // build side. For those rows, we need to process the remaining join
       // predicates later.
       if (builder_->null_aware_partition()->build_rows()->num_rows() != 0) {
-        if (num_other_join_conjuncts > 0) {
-          *status = AppendProbeRow(
-              null_aware_probe_partition_->probe_rows(), current_probe_row_);
-          if (UNLIKELY(!status->ok())) return false;
+        if (num_other_join_conjuncts > 0
+            && 
UNLIKELY(!AppendProbeRow(null_aware_probe_partition_->probe_rows(),
+                   current_probe_row_, status))) {
+          DCHECK(!status->ok());
+          return false;
         }
         return true;
       }
@@ -217,7 +218,7 @@ bool IR_ALWAYS_INLINE 
PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
   return true;
 }
 
-template<int const JoinOp>
+template <int const JoinOp>
 bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRow(
     ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
     ExprContext* const* conjunct_ctxs, int num_conjuncts,
@@ -282,8 +283,11 @@ bool IR_ALWAYS_INLINE 
PartitionedHashJoinNode::NextProbeRow(
           skip_row = true;
         } else {
           // Condition 3 above.
-          *status = AppendProbeRow(null_probe_rows_.get(), current_probe_row_);
-          if (UNLIKELY(!status->ok())) return false;
+          if (UNLIKELY(
+                  !AppendProbeRow(null_probe_rows_.get(), current_probe_row_, 
status))) {
+            DCHECK(!status->ok());
+            return false;
+          }
           matched_null_probe_.push_back(false);
           skip_row = true;
         }
@@ -306,8 +310,10 @@ bool IR_ALWAYS_INLINE 
PartitionedHashJoinNode::NextProbeRow(
           // Skip the current row if we manage to append to the spilled 
partition's BTS.
           // Otherwise, we need to bail out and report the failure.
           BufferedTupleStream* probe_rows = probe_partition->probe_rows();
-          *status = AppendProbeRow(probe_rows, current_probe_row_);
-          if (UNLIKELY(!status->ok())) return false;
+          if (UNLIKELY(!AppendProbeRow(probe_rows, current_probe_row_, 
status))) {
+            DCHECK(!status->ok());
+            return false;
+          }
           skip_row = true;
         }
       }
@@ -426,15 +432,12 @@ int 
PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
   return num_rows_added;
 }
 
-inline Status PartitionedHashJoinNode::AppendProbeRow(
-    BufferedTupleStream* stream, TupleRow* row) {
+inline bool PartitionedHashJoinNode::AppendProbeRow(
+    BufferedTupleStream* stream, TupleRow* row, Status* status) {
   DCHECK(stream->has_write_block());
   DCHECK(!stream->using_small_buffers());
   DCHECK(!stream->is_pinned());
-  Status status;
-  if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
-  DCHECK(!status.ok());
-  return status;
+  return stream->AddRow(row, status);
 }
 
 template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::INNER_JOIN>(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h 
b/be/src/exec/partitioned-hash-join-node.h
index 9827788..5b9264c 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -166,7 +166,10 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
 
   /// Append the probe row 'row' to 'stream'. The stream must be unpinned and 
must have
   /// a write buffer allocated, so this will succeed unless an error is 
encountered.
-  Status AppendProbeRow(BufferedTupleStream* stream, TupleRow* row);
+  /// Returns false and sets 'status' to an error if an error is encountered. 
This odd
+  /// return convention is used to avoid emitting unnecessary code for ~Status 
in perf-
+  /// critical code.
+  bool AppendProbeRow(BufferedTupleStream* stream, TupleRow* row, Status* 
status);
 
   /// Probes the hash table for rows matching the current probe row and appends
   /// all the matching build rows (with probe row) to output batch. Returns 
true
@@ -267,8 +270,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   /// probe_batch_ is entirely consumed.
   /// For RIGHT_ANTI_JOIN, all this function does is to mark whether each 
build row
   /// had a match.
-  /// Returns the number of rows added to out_batch; -1 on error (and *status 
will be
-  /// set). This function doesn't commit rows to the output batch so it's the 
caller's
+  /// Returns the number of rows added to out_batch; -1 on error (and *status 
will
+  /// be set). This function doesn't commit rows to the output batch so it's 
the caller's
   /// responsibility to do so.
   template<int const JoinOp>
   int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch, 
HashTableCtx* ht_ctx,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index b2fce45..e3b3b4a 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -222,8 +222,8 @@ Status 
BufferedTupleStream::UnpinBlock(BufferedBlockMgr::Block* block) {
   return Status::OK();
 }
 
-Status BufferedTupleStream::NewWriteBlock(int64_t block_len, int64_t 
null_indicators_size,
-    bool* got_block) {
+Status BufferedTupleStream::NewWriteBlock(
+    int64_t block_len, int64_t null_indicators_size, bool* got_block) noexcept 
{
   DCHECK(!closed_);
   DCHECK_GE(null_indicators_size, 0);
   *got_block = false;
@@ -282,7 +282,8 @@ Status BufferedTupleStream::NewWriteBlock(int64_t 
block_len, int64_t null_indica
   return Status::OK();
 }
 
-Status BufferedTupleStream::NewWriteBlockForRow(int64_t row_size, bool* 
got_block) {
+Status BufferedTupleStream::NewWriteBlockForRow(
+    int64_t row_size, bool* got_block) noexcept {
   int64_t block_len;
   int64_t null_indicators_size;
   if (use_small_buffers_) {
@@ -694,7 +695,7 @@ void BufferedTupleStream::FixUpCollectionsForRead(const 
vector<SlotDescriptor*>&
   }
 }
 
-int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const {
+int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const noexcept {
   int64_t size = 0;
   if (has_nullable_tuple_) {
     for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) {
@@ -733,7 +734,15 @@ int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) 
const {
   return size;
 }
 
-bool BufferedTupleStream::DeepCopy(TupleRow* row) {
+bool BufferedTupleStream::AddRowSlow(TupleRow* row, Status* status) noexcept {
+  bool got_block;
+  int64_t row_size = ComputeRowSize(row);
+  *status = NewWriteBlockForRow(row_size, &got_block);
+  if (!status->ok() || !got_block) return false;
+  return DeepCopy(row);
+}
+
+bool BufferedTupleStream::DeepCopy(TupleRow* row) noexcept {
   if (has_nullable_tuple_) {
     return DeepCopyInternal<true>(row);
   } else {
@@ -744,7 +753,7 @@ bool BufferedTupleStream::DeepCopy(TupleRow* row) {
 // TODO: this really needs codegen
 // TODO: in case of duplicate tuples, this can redundantly serialize data.
 template <bool HasNullableTuple>
-bool BufferedTupleStream::DeepCopyInternal(TupleRow* row) {
+bool BufferedTupleStream::DeepCopyInternal(TupleRow* row) noexcept {
   if (UNLIKELY(write_block_ == NULL)) return false;
   DCHECK_GE(write_block_null_indicators_size_, 0);
   DCHECK(write_block_->is_pinned()) << DebugString() << std::endl

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h 
b/be/src/runtime/buffered-tuple-stream.h
index d3bfa81..d138150 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -238,15 +238,17 @@ class BufferedTupleStream {
   /// Must be called for streams using small buffers to switch to IO-sized 
buffers.
   /// If it fails to get a buffer (i.e. the switch fails) it resets the 
use_small_buffers_
   /// back to false.
-  /// TODO: this does not seem like the best mechanism.
+  /// TODO: IMPALA-3200: remove this when small buffers are removed.
   Status SwitchToIoBuffers(bool* got_buffer);
 
-  /// Adds a single row to the stream. Returns false and sets *status if an 
error
-  /// occurred. BufferedTupleStream will do a deep copy of the memory in the 
row.
-  /// After AddRow returns false, it should not be called again, unless
-  /// using_small_buffers_ is true, in which case it is valid to call 
SwitchToIoBuffers()
-  /// then AddRow() again.
-  bool AddRow(TupleRow* row, Status* status);
+  /// Adds a single row to the stream. Returns true if the append succeeded, 
returns false
+  /// and sets 'status' to OK if appending failed but can be retried or 
returns false and
+  /// sets 'status' to an error if an error occurred.
+  /// BufferedTupleStream will do a deep copy of the memory in the row. After 
AddRow()
+  /// returns an error, it should not be called again. If appending failed 
without an
+  /// error and the stream is using small buffers, it is valid to call
+  /// SwitchToIoBuffers() then AddRow() again.
+  bool AddRow(TupleRow* row, Status* status) noexcept;
 
   /// Allocates space to store a row of with fixed length 'fixed_size' and 
variable
   /// length data 'varlen_size'. If successful, returns the pointer where 
fixed length
@@ -458,11 +460,15 @@ class BufferedTupleStream {
   RuntimeProfile::Counter* unpin_timer_;
   RuntimeProfile::Counter* get_new_block_timer_;
 
+  /// The slow path for AddRow() that is called if there is not sufficient 
space in
+  /// the current block.
+  bool AddRowSlow(TupleRow* row, Status* status) noexcept;
+
   /// Copies 'row' into write_block_. Returns false if there is not enough 
space in
   /// 'write_block_'. After returning false, write_ptr_ may be left pointing 
to the
   /// partially-written row, and no more data can be written to write_block_.
   template <bool HAS_NULLABLE_TUPLE>
-  bool DeepCopyInternal(TupleRow* row);
+  bool DeepCopyInternal(TupleRow* row) noexcept;
 
   /// Helper function to copy strings in string_slots from tuple into 
write_block_.
   /// Updates write_ptr_ to the end of the string data added. Returns false if 
the data
@@ -480,7 +486,7 @@ class BufferedTupleStream {
       const std::vector<SlotDescriptor*>& collection_slots);
 
   /// Wrapper of the templated DeepCopyInternal() function.
-  bool DeepCopy(TupleRow* row);
+  bool DeepCopy(TupleRow* row) noexcept;
 
   /// Gets a new block of 'block_len' bytes from the block_mgr_, updating 
write_block_,
   /// write_tuple_idx_, write_ptr_ and write_end_ptr_. 'null_indicators_size' 
is the
@@ -488,12 +494,13 @@ class BufferedTupleStream {
   /// *got_block is set to true if a block was successfully acquired. Null 
indicators
   /// (if any) will also be reserved and initialized. If there are no blocks 
available,
   /// *got_block is set to false and write_block_ is unchanged.
-  Status NewWriteBlock(int64_t block_len, int64_t null_indicators_size, bool* 
got_block);
+  Status NewWriteBlock(
+      int64_t block_len, int64_t null_indicators_size, bool* got_block) 
noexcept;
 
   /// A wrapper around NewWriteBlock(). 'row_size' is the size of the tuple 
row to be
   /// appended to this block. This function determines the block size required 
in order
   /// to fit the row and null indicators.
-  Status NewWriteBlockForRow(int64_t row_size, bool* got_block);
+  Status NewWriteBlockForRow(int64_t row_size, bool* got_block) noexcept;
 
   /// Reads the next block from the block_mgr_. This blocks if necessary.
   /// Updates read_block_, read_ptr_, read_tuple_idx_ and read_end_ptr_.
@@ -502,7 +509,7 @@ class BufferedTupleStream {
   /// Returns the total additional bytes that this row will consume in 
write_block_ if
   /// appended to the block. This includes the fixed length part of the row 
and the
   /// data for inlined_string_slots_ and inlined_coll_slots_.
-  int64_t ComputeRowSize(TupleRow* row) const;
+  int64_t ComputeRowSize(TupleRow* row) const noexcept;
 
   /// Unpins block if it is an IO-sized block and updates tracking stats.
   Status UnpinBlock(BufferedBlockMgr::Block* block);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/buffered-tuple-stream.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.inline.h 
b/be/src/runtime/buffered-tuple-stream.inline.h
index 7a2f247..ba6bb8c 100644
--- a/be/src/runtime/buffered-tuple-stream.inline.h
+++ b/be/src/runtime/buffered-tuple-stream.inline.h
@@ -25,14 +25,10 @@
 
 namespace impala {
 
-inline bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) {
+inline bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) 
noexcept {
   DCHECK(!closed_);
   if (LIKELY(DeepCopy(row))) return true;
-  bool got_block;
-  int64_t row_size = ComputeRowSize(row);
-  *status = NewWriteBlockForRow(row_size, &got_block);
-  if (!status->ok() || !got_block) return false;
-  return DeepCopy(row);
+  return AddRowSlow(row, status);
 }
 
 inline uint8_t* BufferedTupleStream::AllocateRow(int fixed_size, int 
varlen_size,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/raw-value.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/raw-value.cc b/be/src/runtime/raw-value.cc
index 7247b8e..aef58f2 100644
--- a/be/src/runtime/raw-value.cc
+++ b/be/src/runtime/raw-value.cc
@@ -191,4 +191,173 @@ void RawValue::Write(const void* value, Tuple* tuple, 
const SlotDescriptor* slot
   }
 }
 
+uint32_t RawValue::GetHashValue(
+    const void* v, const ColumnType& type, uint32_t seed) noexcept {
+  // The choice of hash function needs to be consistent across all hosts of 
the cluster.
+
+  // Use HashCombine with arbitrary constant to ensure we don't return seed.
+  if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed);
+
+  switch (type.type) {
+    case TYPE_CHAR:
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+      return RawValue::GetHashValueNonNull<impala::StringValue>(
+          reinterpret_cast<const StringValue*>(v), type, seed);
+    case TYPE_BOOLEAN:
+      return RawValue::GetHashValueNonNull<bool>(
+          reinterpret_cast<const bool*>(v), type, seed);
+    case TYPE_TINYINT:
+      return RawValue::GetHashValueNonNull<int8_t>(
+          reinterpret_cast<const int8_t*>(v), type, seed);
+    case TYPE_SMALLINT:
+      return RawValue::GetHashValueNonNull<int16_t>(
+          reinterpret_cast<const int16_t*>(v), type, seed);
+    case TYPE_INT:
+      return RawValue::GetHashValueNonNull<int32_t>(
+          reinterpret_cast<const int32_t*>(v), type, seed);
+    case TYPE_BIGINT:
+      return RawValue::GetHashValueNonNull<int64_t>(
+          reinterpret_cast<const int64_t*>(v), type, seed);
+    case TYPE_FLOAT:
+      return RawValue::GetHashValueNonNull<float>(
+          reinterpret_cast<const float*>(v), type, seed);
+    case TYPE_DOUBLE:
+      return RawValue::GetHashValueNonNull<double>(
+          reinterpret_cast<const double*>(v), type, seed);
+    case TYPE_TIMESTAMP:
+      return RawValue::GetHashValueNonNull<TimestampValue>(
+          reinterpret_cast<const TimestampValue*>(v), type, seed);
+    case TYPE_DECIMAL:
+      switch (type.GetByteSize()) {
+        case 4:
+          return RawValue::GetHashValueNonNull<Decimal4Value>(
+              reinterpret_cast<const impala::Decimal4Value*>(v), type, seed);
+        case 8:
+          return RawValue::GetHashValueNonNull<Decimal8Value>(
+              reinterpret_cast<const Decimal8Value*>(v), type, seed);
+        case 16:
+          return RawValue::GetHashValueNonNull<Decimal16Value>(
+              reinterpret_cast<const Decimal16Value*>(v), type, seed);
+          DCHECK(false);
+      }
+    default: DCHECK(false); return 0;
+  }
+}
+
+uint32_t RawValue::GetHashValueFnv(const void* v, const ColumnType& type, 
uint32_t seed) {
+  // Use HashCombine with arbitrary constant to ensure we don't return seed.
+  if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed);
+
+  switch (type.type) {
+    case TYPE_STRING:
+    case TYPE_VARCHAR: {
+      const StringValue* string_value = reinterpret_cast<const 
StringValue*>(v);
+      if (string_value->len == 0) {
+        return HashUtil::HashCombine32(HASH_VAL_EMPTY, seed);
+      }
+      return HashUtil::FnvHash64to32(string_value->ptr, string_value->len, 
seed);
+    }
+    case TYPE_BOOLEAN:
+      return HashUtil::HashCombine32(*reinterpret_cast<const bool*>(v), seed);
+    case TYPE_TINYINT: return HashUtil::FnvHash64to32(v, 1, seed);
+    case TYPE_SMALLINT: return HashUtil::FnvHash64to32(v, 2, seed);
+    case TYPE_INT: return HashUtil::FnvHash64to32(v, 4, seed);
+    case TYPE_BIGINT: return HashUtil::FnvHash64to32(v, 8, seed);
+    case TYPE_FLOAT: return HashUtil::FnvHash64to32(v, 4, seed);
+    case TYPE_DOUBLE: return HashUtil::FnvHash64to32(v, 8, seed);
+    case TYPE_TIMESTAMP: return HashUtil::FnvHash64to32(v, 12, seed);
+    case TYPE_CHAR:
+      return HashUtil::FnvHash64to32(StringValue::CharSlotToPtr(v, type), 
type.len, seed);
+    case TYPE_DECIMAL: return HashUtil::FnvHash64to32(v, type.GetByteSize(), 
seed);
+    default: DCHECK(false); return 0;
+  }
+}
+
+void RawValue::PrintValue(
+    const void* value, const ColumnType& type, int scale, std::stringstream* 
stream) {
+  if (value == NULL) {
+    *stream << "NULL";
+    return;
+  }
+
+  int old_precision = stream->precision();
+  std::ios_base::fmtflags old_flags = stream->flags();
+  if (scale > -1) {
+    stream->precision(scale);
+    // Setting 'fixed' causes precision to set the number of digits printed 
after the
+    // decimal (by default it sets the maximum number of digits total).
+    *stream << std::fixed;
+  }
+
+  const StringValue* string_val = NULL;
+  switch (type.type) {
+    case TYPE_BOOLEAN: {
+      bool val = *reinterpret_cast<const bool*>(value);
+      *stream << (val ? "true" : "false");
+      return;
+    }
+    case TYPE_TINYINT:
+      // Extra casting for chars since they should not be interpreted as ASCII.
+      *stream << static_cast<int>(*reinterpret_cast<const int8_t*>(value));
+      break;
+    case TYPE_SMALLINT: *stream << *reinterpret_cast<const int16_t*>(value); 
break;
+    case TYPE_INT: *stream << *reinterpret_cast<const int32_t*>(value); break;
+    case TYPE_BIGINT: *stream << *reinterpret_cast<const int64_t*>(value); 
break;
+    case TYPE_FLOAT: {
+      float val = *reinterpret_cast<const float*>(value);
+      if (LIKELY(std::isfinite(val))) {
+        *stream << val;
+      } else if (std::isinf(val)) {
+        // 'Infinity' is Java's text representation of inf. By staying close 
to Java, we
+        // allow Hive to read text tables containing non-finite values 
produced by
+        // Impala. (The same logic applies to 'NaN', below).
+        *stream << (val < 0 ? "-Infinity" : "Infinity");
+      } else if (std::isnan(val)) {
+        *stream << "NaN";
+      }
+    } break;
+    case TYPE_DOUBLE: {
+      double val = *reinterpret_cast<const double*>(value);
+      if (LIKELY(std::isfinite(val))) {
+        *stream << val;
+      } else if (std::isinf(val)) {
+        // See TYPE_FLOAT for rationale.
+        *stream << (val < 0 ? "-Infinity" : "Infinity");
+      } else if (std::isnan(val)) {
+        *stream << "NaN";
+      }
+    } break;
+    case TYPE_VARCHAR:
+    case TYPE_STRING:
+      string_val = reinterpret_cast<const StringValue*>(value);
+      if (type.type == TYPE_VARCHAR) DCHECK(string_val->len <= type.len);
+      stream->write(string_val->ptr, string_val->len);
+      break;
+    case TYPE_TIMESTAMP:
+      *stream << *reinterpret_cast<const TimestampValue*>(value);
+      break;
+    case TYPE_CHAR:
+      stream->write(StringValue::CharSlotToPtr(value, type), type.len);
+      break;
+    case TYPE_DECIMAL:
+      switch (type.GetByteSize()) {
+        case 4:
+          *stream << reinterpret_cast<const 
Decimal4Value*>(value)->ToString(type);
+          break;
+        case 8:
+          *stream << reinterpret_cast<const 
Decimal8Value*>(value)->ToString(type);
+          break;
+        case 16:
+          *stream << reinterpret_cast<const 
Decimal16Value*>(value)->ToString(type);
+          break;
+        default: DCHECK(false) << type;
+      }
+      break;
+    default: DCHECK(false);
+  }
+  stream->precision(old_precision);
+  // Undo setting stream to fixed
+  stream->flags(old_flags);
+}
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/raw-value.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/raw-value.h b/be/src/runtime/raw-value.h
index bc76b2c..5ec8ed1 100644
--- a/be/src/runtime/raw-value.h
+++ b/be/src/runtime/raw-value.h
@@ -54,8 +54,8 @@ class RawValue {
 
   /// Returns hash value for 'v' interpreted as 'type'.  The resulting hash 
value
   /// is combined with the seed value.
-  static inline uint32_t GetHashValue(const void* v, const ColumnType& type,
-      uint32_t seed = 0);
+  static uint32_t GetHashValue(
+      const void* v, const ColumnType& type, uint32_t seed = 0) noexcept;
 
   /// Templatized version of GetHashValue, use if type is known ahead. 
GetHashValue
   /// handles nulls.
@@ -74,8 +74,7 @@ class RawValue {
   /// GetHashValue() does not have this property and cannot be safely used as 
the first
   /// step in data repartitioning. However, GetHashValue() can be 
significantly faster.
   /// TODO: fix GetHashValue
-  static inline uint32_t GetHashValueFnv(const void* v, const ColumnType& type,
-      uint32_t seed);
+  static uint32_t GetHashValueFnv(const void* v, const ColumnType& type, 
uint32_t seed);
 
   /// Compares both values.
   /// Return value is < 0  if v1 < v2, 0 if v1 == v2, > 0 if v1 > v2.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/raw-value.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/raw-value.inline.h 
b/be/src/runtime/raw-value.inline.h
index a1f1d75..63c9a07 100644
--- a/be/src/runtime/raw-value.inline.h
+++ b/be/src/runtime/raw-value.inline.h
@@ -211,194 +211,6 @@ inline uint32_t RawValue::GetHashValue(const T* v, const 
ColumnType& type,
   if (UNLIKELY(v == NULL)) return HashUtil::HashCombine32(HASH_VAL_NULL, seed);
   return RawValue::GetHashValueNonNull<T>(v, type, seed);
 }
-
-inline uint32_t RawValue::GetHashValue(const void* v, const ColumnType& type,
-    uint32_t seed) {
-  //The choice of hash function needs to be consistent across all hosts of the 
cluster.
-
-  // Use HashCombine with arbitrary constant to ensure we don't return seed.
-  if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed);
-
-  switch (type.type) {
-    case TYPE_CHAR:
-    case TYPE_STRING:
-    case TYPE_VARCHAR:
-      return RawValue::GetHashValueNonNull<impala::StringValue>(
-        reinterpret_cast<const StringValue*>(v), type, seed);
-    case TYPE_BOOLEAN:
-      return RawValue::GetHashValueNonNull<bool>(
-        reinterpret_cast<const bool*>(v), type, seed);
-    case TYPE_TINYINT:
-      return RawValue::GetHashValueNonNull<int8_t>(
-        reinterpret_cast<const int8_t*>(v), type, seed);
-    case TYPE_SMALLINT:
-      return RawValue::GetHashValueNonNull<int16_t>(
-        reinterpret_cast<const int16_t*>(v), type, seed);
-    case TYPE_INT:
-      return RawValue::GetHashValueNonNull<int32_t>(
-        reinterpret_cast<const int32_t*>(v), type, seed);
-    case TYPE_BIGINT:
-      return RawValue::GetHashValueNonNull<int64_t>(
-        reinterpret_cast<const int64_t*>(v), type, seed);
-    case TYPE_FLOAT:
-      return  RawValue::GetHashValueNonNull<float>(
-        reinterpret_cast<const float*>(v), type, seed);
-    case TYPE_DOUBLE:
-      return RawValue::GetHashValueNonNull<double>(
-        reinterpret_cast<const double*>(v), type, seed);
-    case TYPE_TIMESTAMP:
-      return  RawValue::GetHashValueNonNull<TimestampValue>(
-        reinterpret_cast<const TimestampValue*>(v), type, seed);
-    case TYPE_DECIMAL:
-      switch(type.GetByteSize()) {
-        case 4: return
-          RawValue::GetHashValueNonNull<Decimal4Value>(
-            reinterpret_cast<const impala::Decimal4Value*>(v), type, seed);
-        case 8:
-          return RawValue::GetHashValueNonNull<Decimal8Value>(
-            reinterpret_cast<const Decimal8Value*>(v), type, seed);
-        case 16:
-          return RawValue::GetHashValueNonNull<Decimal16Value>(
-            reinterpret_cast<const Decimal16Value*>(v), type, seed);
-        DCHECK(false);
-    }
-    default:
-      DCHECK(false);
-      return 0;
-  }
-}
-
-inline uint32_t RawValue::GetHashValueFnv(const void* v, const ColumnType& 
type,
-    uint32_t seed) {
-  // Use HashCombine with arbitrary constant to ensure we don't return seed.
-  if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed);
-
-  switch (type.type ) {
-    case TYPE_STRING:
-    case TYPE_VARCHAR: {
-      const StringValue* string_value = reinterpret_cast<const 
StringValue*>(v);
-      if (string_value->len == 0) {
-        return HashUtil::HashCombine32(HASH_VAL_EMPTY, seed);
-      }
-      return HashUtil::FnvHash64to32(string_value->ptr, string_value->len, 
seed);
-    }
-    case TYPE_BOOLEAN:
-      return HashUtil::HashCombine32(*reinterpret_cast<const bool*>(v), seed);
-    case TYPE_TINYINT: return HashUtil::FnvHash64to32(v, 1, seed);
-    case TYPE_SMALLINT: return HashUtil::FnvHash64to32(v, 2, seed);
-    case TYPE_INT: return HashUtil::FnvHash64to32(v, 4, seed);
-    case TYPE_BIGINT: return HashUtil::FnvHash64to32(v, 8, seed);
-    case TYPE_FLOAT: return HashUtil::FnvHash64to32(v, 4, seed);
-    case TYPE_DOUBLE: return HashUtil::FnvHash64to32(v, 8, seed);
-    case TYPE_TIMESTAMP: return HashUtil::FnvHash64to32(v, 12, seed);
-    case TYPE_CHAR: return 
HashUtil::FnvHash64to32(StringValue::CharSlotToPtr(v, type),
-                                                   type.len, seed);
-    case TYPE_DECIMAL: return HashUtil::FnvHash64to32(v, type.GetByteSize(), 
seed);
-    default:
-      DCHECK(false);
-      return 0;
-  }
-}
-
-inline void RawValue::PrintValue(const void* value, const ColumnType& type, 
int scale,
-    std::stringstream* stream) {
-  if (value == NULL) {
-    *stream << "NULL";
-    return;
-  }
-
-  int old_precision = stream->precision();
-  std::ios_base::fmtflags old_flags = stream->flags();
-  if (scale > -1) {
-    stream->precision(scale);
-    // Setting 'fixed' causes precision to set the number of digits printed 
after the
-    // decimal (by default it sets the maximum number of digits total).
-    *stream << std::fixed;
-  }
-
-  const StringValue* string_val = NULL;
-  switch (type.type) {
-    case TYPE_BOOLEAN: {
-      bool val = *reinterpret_cast<const bool*>(value);
-      *stream << (val ? "true" : "false");
-      return;
-    }
-    case TYPE_TINYINT:
-      // Extra casting for chars since they should not be interpreted as ASCII.
-      *stream << static_cast<int>(*reinterpret_cast<const int8_t*>(value));
-      break;
-    case TYPE_SMALLINT:
-      *stream << *reinterpret_cast<const int16_t*>(value);
-      break;
-    case TYPE_INT:
-      *stream << *reinterpret_cast<const int32_t*>(value);
-      break;
-    case TYPE_BIGINT:
-      *stream << *reinterpret_cast<const int64_t*>(value);
-      break;
-    case TYPE_FLOAT:
-      {
-        float val = *reinterpret_cast<const float*>(value);
-        if (LIKELY(std::isfinite(val))) {
-          *stream << val;
-        } else if (std::isinf(val)) {
-          // 'Infinity' is Java's text representation of inf. By staying close 
to Java, we
-          // allow Hive to read text tables containing non-finite values 
produced by
-          // Impala. (The same logic applies to 'NaN', below).
-          *stream << (val < 0 ? "-Infinity" : "Infinity");
-        } else if (std::isnan(val)) {
-          *stream << "NaN";
-        }
-      }
-      break;
-    case TYPE_DOUBLE:
-      {
-        double val = *reinterpret_cast<const double*>(value);
-        if (LIKELY(std::isfinite(val))) {
-          *stream << val;
-        } else if (std::isinf(val)) {
-          // See TYPE_FLOAT for rationale.
-          *stream << (val < 0 ? "-Infinity" : "Infinity");
-        } else if (std::isnan(val)) {
-          *stream << "NaN";
-        }
-      }
-      break;
-    case TYPE_VARCHAR:
-    case TYPE_STRING:
-      string_val = reinterpret_cast<const StringValue*>(value);
-      if (type.type == TYPE_VARCHAR) DCHECK(string_val->len <= type.len);
-      stream->write(string_val->ptr, string_val->len);
-      break;
-    case TYPE_TIMESTAMP:
-      *stream << *reinterpret_cast<const TimestampValue*>(value);
-      break;
-    case TYPE_CHAR:
-      stream->write(StringValue::CharSlotToPtr(value, type), type.len);
-      break;
-    case TYPE_DECIMAL:
-      switch (type.GetByteSize()) {
-        case 4:
-          *stream << reinterpret_cast<const 
Decimal4Value*>(value)->ToString(type);
-          break;
-        case 8:
-          *stream << reinterpret_cast<const 
Decimal8Value*>(value)->ToString(type);
-          break;
-        case 16:
-          *stream << reinterpret_cast<const 
Decimal16Value*>(value)->ToString(type);
-          break;
-        default:
-          DCHECK(false) << type;
-      }
-      break;
-    default:
-      DCHECK(false);
-  }
-  stream->precision(old_precision);
-  // Undo setting stream to fixed
-  stream->flags(old_flags);
-}
-
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/util/bloom-filter.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc
index 7d8c8f7..6fd53f5 100644
--- a/be/src/util/bloom-filter.cc
+++ b/be/src/util/bloom-filter.cc
@@ -83,6 +83,78 @@ void BloomFilter::ToThrift(const BloomFilter* filter, 
TBloomFilter* thrift) {
   filter->ToThrift(thrift);
 }
 
+// The SIMD reinterpret_casts technically violate C++'s strict aliasing rules. 
However, we
+// compile with -fno-strict-aliasing.
+
+void BloomFilter::BucketInsert(const uint32_t bucket_idx, const uint32_t hash) 
{
+  // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit 
word. It is
+  // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads 
in the second
+  // part of this method.
+  uint32_t new_bucket[8] __attribute__((aligned(16)));
+  for (int i = 0; i < 8; ++i) {
+    // Rehash 'hash' and use the top LOG_BUCKET_WORD_BITS bits, following 
Dietzfelbinger.
+    new_bucket[i] =
+        (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - 
LOG_BUCKET_WORD_BITS);
+    new_bucket[i] = 1U << new_bucket[i];
+  }
+  for (int i = 0; i < 2; ++i) {
+    __m128i new_bucket_sse =
+        _mm_load_si128(reinterpret_cast<__m128i*>(new_bucket + 4 * i));
+    __m128i* existing_bucket = 
reinterpret_cast<__m128i*>(&directory_[bucket_idx][4 * i]);
+    *existing_bucket = _mm_or_si128(*existing_bucket, new_bucket_sse);
+  }
+}
+
+__m256i BloomFilter::MakeMask(const uint32_t hash) {
+   const __m256i ones = _mm256_set1_epi32(1);
+   const __m256i rehash = _mm256_setr_epi32(IMPALA_BLOOM_HASH_CONSTANTS);
+  // Load hash into a YMM register, repeated eight times
+  __m256i hash_data = _mm256_set1_epi32(hash);
+  // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by 
eight different
+  // odd constants, then keep the 5 most significant bits from each product.
+  hash_data = _mm256_mullo_epi32(rehash, hash_data);
+  hash_data = _mm256_srli_epi32(hash_data, 27);
+  // Use these 5 bits to shift a single bit to a location in each 32-bit lane
+  return _mm256_sllv_epi32(ones, hash_data);
+}
+
+void BloomFilter::BucketInsertAVX2(
+    const uint32_t bucket_idx, const uint32_t hash) {
+  const __m256i mask = MakeMask(hash);
+  __m256i* const bucket = &reinterpret_cast<__m256i*>(directory_)[bucket_idx];
+  _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask));
+  // For SSE compatibility, unset the high bits of each YMM register so SSE 
instructions
+  // dont have to save them off before using XMM registers.
+  _mm256_zeroupper();
+}
+
+bool BloomFilter::BucketFindAVX2(
+    const uint32_t bucket_idx, const uint32_t hash) const {
+  const __m256i mask = MakeMask(hash);
+  const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx];
+  // We should return true if 'bucket' has a one wherever 'mask' does. 
_mm256_testc_si256
+  // takes the negation of its first argument and ands that with its second 
argument. In
+  // our case, the result is zero everywhere iff there is a one in 'bucket' 
wherever
+  // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 
0 otherwise.
+  const bool result = _mm256_testc_si256(bucket, mask);
+  _mm256_zeroupper();
+  return result;
+}
+
+bool BloomFilter::BucketFind(
+    const uint32_t bucket_idx, const uint32_t hash) const {
+  for (int i = 0; i < BUCKET_WORDS; ++i) {
+    BucketWord hval =
+        (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - 
LOG_BUCKET_WORD_BITS);
+    hval = 1U << hval;
+    if (!(directory_[bucket_idx][i] & hval)) {
+      return false;
+    }
+  }
+  return true;
+}
+
+
 void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) {
   DCHECK(out != NULL);
   DCHECK_EQ(in.log_heap_space, out->log_heap_space);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/util/bloom-filter.h
----------------------------------------------------------------------
diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h
index 7a94995..4342814 100644
--- a/be/src/util/bloom-filter.h
+++ b/be/src/util/bloom-filter.h
@@ -25,9 +25,9 @@
 
 #include <immintrin.h>
 
-#include "gutil/macros.h"
-
+#include "common/compiler-util.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
+#include "gutil/macros.h"
 #include "runtime/buffered-block-mgr.h"
 
 namespace impala {
@@ -173,7 +173,7 @@ class BloomFilter {
 // the advantage of requiring fewer random bits: log2(32) * 8 = 5 * 8 = 40 
random bits for
 // a split Bloom filter, but log2(256) * 8 = 64 random bits for a standard 
Bloom filter.
 
-inline void BloomFilter::Insert(const uint32_t hash) {
+inline void ALWAYS_INLINE BloomFilter::Insert(const uint32_t hash) {
   const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
   if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
     BucketInsertAVX2(bucket_idx, hash);
@@ -182,7 +182,7 @@ inline void BloomFilter::Insert(const uint32_t hash) {
   }
 }
 
-inline bool BloomFilter::Find(const uint32_t hash) const {
+inline bool ALWAYS_INLINE BloomFilter::Find(const uint32_t hash) const {
   const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_;
   if (CpuInfo::IsSupported(CpuInfo::AVX2)) {
     return BucketFindAVX2(bucket_idx, hash);
@@ -191,78 +191,6 @@ inline bool BloomFilter::Find(const uint32_t hash) const {
   }
 }
 
-// The SIMD reinterpret_casts technically violate C++'s strict aliasing rules. 
However, we
-// compile with -fno-strict-aliasing.
-
-inline void BloomFilter::BucketInsert(const uint32_t bucket_idx, const 
uint32_t hash) {
-  // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit 
word. It is
-  // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads 
in the second
-  // part of this method.
-  uint32_t new_bucket[8] __attribute__((aligned(16)));
-  for (int i = 0; i < 8; ++i) {
-    // Rehash 'hash' and use the top LOG_BUCKET_WORD_BITS bits, following 
Dietzfelbinger.
-    new_bucket[i] =
-        (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - 
LOG_BUCKET_WORD_BITS);
-    new_bucket[i] = 1U << new_bucket[i];
-  }
-  for (int i = 0; i < 2; ++i) {
-    __m128i new_bucket_sse =
-        _mm_load_si128(reinterpret_cast<__m128i*>(new_bucket + 4 * i));
-    __m128i* existing_bucket = 
reinterpret_cast<__m128i*>(&directory_[bucket_idx][4 * i]);
-    *existing_bucket = _mm_or_si128(*existing_bucket, new_bucket_sse);
-  }
-}
-
-inline __m256i BloomFilter::MakeMask(const uint32_t hash) {
-   const __m256i ones = _mm256_set1_epi32(1);
-   const __m256i rehash = _mm256_setr_epi32(IMPALA_BLOOM_HASH_CONSTANTS);
-  // Load hash into a YMM register, repeated eight times
-  __m256i hash_data = _mm256_set1_epi32(hash);
-  // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by 
eight different
-  // odd constants, then keep the 5 most significant bits from each product.
-  hash_data = _mm256_mullo_epi32(rehash, hash_data);
-  hash_data = _mm256_srli_epi32(hash_data, 27);
-  // Use these 5 bits to shift a single bit to a location in each 32-bit lane
-  return _mm256_sllv_epi32(ones, hash_data);
-}
-
-inline void BloomFilter::BucketInsertAVX2(
-    const uint32_t bucket_idx, const uint32_t hash) {
-  const __m256i mask = MakeMask(hash);
-  __m256i* const bucket = &reinterpret_cast<__m256i*>(directory_)[bucket_idx];
-  _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask));
-  // For SSE compatibility, unset the high bits of each YMM register so SSE 
instructions
-  // dont have to save them off before using XMM registers.
-  _mm256_zeroupper();
-}
-
-inline bool BloomFilter::BucketFindAVX2(
-    const uint32_t bucket_idx, const uint32_t hash) const {
-  const __m256i mask = MakeMask(hash);
-  const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx];
-  // We should return true if 'bucket' has a one wherever 'mask' does. 
_mm256_testc_si256
-  // takes the negation of its first argument and ands that with its second 
argument. In
-  // our case, the result is zero everywhere iff there is a one in 'bucket' 
wherever
-  // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 
0 otherwise.
-  const bool result = _mm256_testc_si256(bucket, mask);
-  _mm256_zeroupper();
-  return result;
-}
-
-inline bool BloomFilter::BucketFind(
-    const uint32_t bucket_idx, const uint32_t hash) const {
-  for (int i = 0; i < BUCKET_WORDS; ++i) {
-    BucketWord hval =
-        (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - 
LOG_BUCKET_WORD_BITS);
-    hval = 1U << hval;
-    if (!(directory_[bucket_idx][i] & hval)) {
-      return false;
-    }
-  }
-  return true;
-}
-
 }  // namespace impala
 
-#undef IMPALA_BLOOM_HASH_CONSTANTS
 #endif  // IMPALA_UTIL_BLOOM_H

Reply via email to