IMPALA-4231: fix codegen time regression The commit "IMPALA-3567 Part 2, IMPALA-3899: factor out PHJ builder" slightly increased codegen time, which caused TPC-H Q2 to sometimes regress significantly because of races in runtime filter arrival.
This patch attempts to fix the regression by improving codegen time in a few places. * Revert to using the old bool/Status return pattern. The regular Status return pattern results in significantly more complex IR because it has to emit code to copy and free statuses. I spent some time trying to convince it to optimise the extra code out, but didn't have much success. * Remove some code that cannot be specialized from cross-compilation. * Add noexcept to some functions that are used from the IR to ensure exception-handling IR is not emitted. This is less important after the first change but still should help produce cleaner IR. Performance: I was able to reproduce a regression locally, which is fixed by this patch. I'm in the process of trying to verify the fix on a cluster. Change-Id: Idf0fdedabd488550b6db90167a30c582949d608d Reviewed-on: http://gerrit.cloudera.org:8080/4623 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c7fe4385 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c7fe4385 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c7fe4385 Branch: refs/heads/hadoop-next Commit: c7fe4385d927509443a1c4e2c6e9a802d2dcf63b Parents: 89b41c6 Author: Tim Armstrong <[email protected]> Authored: Fri Sep 30 15:18:54 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Fri Oct 14 02:53:59 2016 +0000 ---------------------------------------------------------------------- be/src/common/status.h | 30 +-- be/src/exec/hash-table.cc | 8 +- be/src/exec/hash-table.h | 16 +- be/src/exec/partitioned-aggregation-node-ir.cc | 6 +- be/src/exec/partitioned-aggregation-node.cc | 10 +- be/src/exec/partitioned-aggregation-node.h | 9 +- be/src/exec/partitioned-hash-join-builder-ir.cc | 20 +- be/src/exec/partitioned-hash-join-builder.cc | 19 +- be/src/exec/partitioned-hash-join-builder.h | 14 +- be/src/exec/partitioned-hash-join-node-ir.cc | 33 ++-- be/src/exec/partitioned-hash-join-node.h | 9 +- be/src/runtime/buffered-tuple-stream.cc | 21 ++- be/src/runtime/buffered-tuple-stream.h | 31 +-- be/src/runtime/buffered-tuple-stream.inline.h | 8 +- be/src/runtime/raw-value.cc | 169 +++++++++++++++++ be/src/runtime/raw-value.h | 7 +- be/src/runtime/raw-value.inline.h | 188 ------------------- be/src/util/bloom-filter.cc | 72 +++++++ be/src/util/bloom-filter.h | 80 +------- 19 files changed, 382 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/common/status.h ---------------------------------------------------------------------- diff --git a/be/src/common/status.h b/be/src/common/status.h index 35a9a94..9a9ea67 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -99,6 +99,10 @@ class Status { if (UNLIKELY(status.msg_ != NULL)) CopyMessageFrom(status); } + /// Move constructor that moves the error message (if any) and resets 'other' to the + /// default OK Status. + ALWAYS_INLINE Status(Status&& other) : msg_(other.msg_) { other.msg_ = NULL; } + /// Status using only the error code as a parameter. This can be used for error messages /// that don't take format parameters. Status(TErrorCode::type code); @@ -153,6 +157,15 @@ class Status { return *this; } + /// Move assignment that moves the error message (if any) and resets 'other' to the + /// default OK Status. + ALWAYS_INLINE Status& operator=(Status&& other) { + if (UNLIKELY(msg_ != NULL)) FreeMessage(); + msg_ = other.msg_; + other.msg_ = NULL; + return *this; + } + ALWAYS_INLINE ~Status() { // The UNLIKELY and inlining here are important hints for the compiler to // streamline the common case of Status::OK(). Use FreeMessage() which is @@ -244,21 +257,12 @@ class Status { }; /// some generally useful macros -#define RETURN_IF_ERROR(stmt) \ - do { \ - Status __status__ = (stmt); \ - if (UNLIKELY(!__status__.ok())) return __status__; \ +#define RETURN_IF_ERROR(stmt) \ + do { \ + Status __status__ = (stmt); \ + if (UNLIKELY(!__status__.ok())) return std::move(__status__); \ } while (false) -#define RETURN_IF_ERROR_PREPEND(expr, prepend) \ - do { \ - Status __status__ = (stmt); \ - if (UNLIKELY(!__status__.ok())) { \ - return Status(strings::Substitute("$0: $1", prepend, __status__.GetDetail())); \ - } \ - } while (false) - - #define ABORT_IF_ERROR(stmt) \ do { \ Status __status__ = (stmt); \ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/hash-table.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc index 0d780b9..6626b33 100644 --- a/be/src/exec/hash-table.cc +++ b/be/src/exec/hash-table.cc @@ -149,7 +149,7 @@ uint32_t HashTableCtx::Hash(const void* input, int len, uint32_t hash) const { } uint32_t HashTableCtx::HashRow( - const uint8_t* expr_values, const uint8_t* expr_values_null) const { + const uint8_t* expr_values, const uint8_t* expr_values_null) const noexcept { DCHECK_LT(level_, seeds_.size()); if (expr_values_cache_.var_result_offset() == -1) { /// This handles NULLs implicitly since a constant seed value was put @@ -162,7 +162,7 @@ uint32_t HashTableCtx::HashRow( } bool HashTableCtx::EvalRow(const TupleRow* row, const vector<ExprContext*>& ctxs, - uint8_t* expr_values, uint8_t* expr_values_null) { + uint8_t* expr_values, uint8_t* expr_values_null) noexcept { bool has_null = false; for (int i = 0; i < ctxs.size(); ++i) { void* loc = expr_values_cache_.ExprValuePtr(expr_values, i); @@ -213,7 +213,7 @@ uint32_t HashTableCtx::HashVariableLenRow(const uint8_t* expr_values, template <bool FORCE_NULL_EQUALITY> bool HashTableCtx::Equals(const TupleRow* build_row, const uint8_t* expr_values, - const uint8_t* expr_values_null) const { + const uint8_t* expr_values_null) const noexcept { for (int i = 0; i < build_expr_ctxs_.size(); ++i) { void* val = build_expr_ctxs_[i]->GetValue(build_row); if (val == NULL) { @@ -331,7 +331,7 @@ void HashTableCtx::ExprValuesCache::ResetIterators() { cur_expr_values_hash_ = expr_values_hash_array_.get(); } -void HashTableCtx::ExprValuesCache::Reset() { +void HashTableCtx::ExprValuesCache::Reset() noexcept { ResetIterators(); // Set the end pointer after resetting the other pointers so they point to // the same location. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/hash-table.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h index fead1f7..4edd130 100644 --- a/be/src/exec/hash-table.h +++ b/be/src/exec/hash-table.h @@ -256,7 +256,7 @@ class HashTableCtx { void Close(MemTracker* tracker); /// Resets the cache states (iterators, end pointers etc) before writing. - void Reset(); + void Reset() noexcept; /// Resets the iterators to the start before reading. Will record the current position /// of the iterators in end pointer before resetting so AtEnd() can determine if all @@ -406,7 +406,7 @@ class HashTableCtx { /// This will be replaced by codegen. We don't want this inlined for replacing /// with codegen'd functions so the function name does not change. uint32_t IR_NO_INLINE HashRow( - const uint8_t* expr_values, const uint8_t* expr_values_null) const; + const uint8_t* expr_values, const uint8_t* expr_values_null) const noexcept; /// Wrapper function for calling correct HashUtil function in non-codegen'd case. uint32_t Hash(const void* input, int len, uint32_t hash) const; @@ -416,15 +416,15 @@ class HashTableCtx { /// inlined when cross compiled because we need to be able to differentiate between /// EvalBuildRow and EvalProbeRow by name and the build/probe exprs are baked into the /// codegen'd function. - bool IR_NO_INLINE EvalBuildRow(const TupleRow* row, uint8_t* expr_values, - uint8_t* expr_values_null) { + bool IR_NO_INLINE EvalBuildRow( + const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) noexcept { return EvalRow(row, build_expr_ctxs_, expr_values, expr_values_null); } /// Evaluate 'row' over probe exprs, storing the values into 'expr_values' and nullness /// into 'expr_values_null'. This will be replaced by codegen. - bool IR_NO_INLINE EvalProbeRow(const TupleRow* row, uint8_t* expr_values, - uint8_t* expr_values_null) { + bool IR_NO_INLINE EvalProbeRow( + const TupleRow* row, uint8_t* expr_values, uint8_t* expr_values_null) noexcept { return EvalRow(row, probe_expr_ctxs_, expr_values, expr_values_null); } @@ -437,7 +437,7 @@ class HashTableCtx { /// 'expr_values_null'. Returns whether any expr evaluated to NULL. This will be /// replaced by codegen. bool EvalRow(const TupleRow* row, const std::vector<ExprContext*>& ctxs, - uint8_t* expr_values, uint8_t* expr_values_null); + uint8_t* expr_values, uint8_t* expr_values_null) noexcept; /// Returns true if the values of build_exprs evaluated over 'build_row' equal the /// values in 'expr_values' with nullness 'expr_values_null'. FORCE_NULL_EQUALITY is @@ -445,7 +445,7 @@ class HashTableCtx { /// 'finds_nulls_'. This will be replaced by codegen. template <bool FORCE_NULL_EQUALITY> bool IR_NO_INLINE Equals(const TupleRow* build_row, const uint8_t* expr_values, - const uint8_t* expr_values_null) const; + const uint8_t* expr_values_null) const noexcept; /// Helper function that calls Equals() with the current row. Always inlined so that /// it does not appear in cross-compiled IR. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-aggregation-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc index ed95844..83362e7 100644 --- a/be/src/exec/partitioned-aggregation-node-ir.cc +++ b/be/src/exec/partitioned-aggregation-node-ir.cc @@ -153,7 +153,7 @@ Status PartitionedAggregationNode::AddIntermediateTuple(Partition* __restrict__ insert_it.SetTuple(intermediate_tuple, hash); return Status::OK(); } else if (!process_batch_status_.ok()) { - return process_batch_status_; + return std::move(process_batch_status_); } // We did not have enough memory to add intermediate_tuple to the stream. @@ -198,13 +198,13 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize, !TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx], GetHashTable(partition_idx), in_row, hash, &remaining_capacity[partition_idx], &process_batch_status_)) { - RETURN_IF_ERROR(process_batch_status_); + RETURN_IF_ERROR(std::move(process_batch_status_)); // Tuple is not going into hash table, add it to the output batch. Tuple* intermediate_tuple = ConstructIntermediateTuple(agg_fn_ctxs_, out_batch->tuple_data_pool(), &process_batch_status_); if (UNLIKELY(intermediate_tuple == NULL)) { DCHECK(!process_batch_status_.ok()); - return process_batch_status_; + return std::move(process_batch_status_); } UpdateTuple(&agg_fn_ctxs_[0], intermediate_tuple, in_row); out_batch_iterator.Get()->SetTuple(0, intermediate_tuple); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-aggregation-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index f926725..629e407 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -965,7 +965,7 @@ Tuple* PartitionedAggregationNode::ConstructSingletonOutputTuple( } Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( - const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool, Status* status) { + const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool, Status* status) noexcept { const int fixed_size = intermediate_tuple_desc_->byte_size(); const int varlen_size = GroupingExprsVarlenSize(); const int tuple_data_size = fixed_size + varlen_size; @@ -985,8 +985,8 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( } Tuple* PartitionedAggregationNode::ConstructIntermediateTuple( - const vector<FunctionContext*>& agg_fn_ctxs, - BufferedTupleStream* stream, Status* status) { + const vector<FunctionContext*>& agg_fn_ctxs, BufferedTupleStream* stream, + Status* status) noexcept { DCHECK(stream != NULL && status != NULL); // Allocate space for the entire tuple in the stream. const int fixed_size = intermediate_tuple_desc_->byte_size(); @@ -1090,8 +1090,8 @@ void PartitionedAggregationNode::InitAggSlots( } } -void PartitionedAggregationNode::UpdateTuple(FunctionContext** agg_fn_ctxs, - Tuple* tuple, TupleRow* row, bool is_merge) { +void PartitionedAggregationNode::UpdateTuple( + FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row, bool is_merge) noexcept { DCHECK(tuple != NULL || aggregate_evaluators_.empty()); for (int i = 0; i < aggregate_evaluators_.size(); ++i) { if (is_merge) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-aggregation-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h index c766ab2..0c0f3e8 100644 --- a/be/src/exec/partitioned-aggregation-node.h +++ b/be/src/exec/partitioned-aggregation-node.h @@ -444,14 +444,13 @@ class PartitionedAggregationNode : public ExecNode { /// full, it will attempt to switch to IO-buffers. Tuple* ConstructIntermediateTuple( const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, - BufferedTupleStream* stream, Status* status); + BufferedTupleStream* stream, Status* status) noexcept; /// Constructs intermediate tuple, allocating memory from pool instead of the stream. /// Returns NULL and sets status if there is not enough memory to allocate the tuple. Tuple* ConstructIntermediateTuple( - const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, - MemPool* pool, Status* status); - + const std::vector<impala_udf::FunctionContext*>& agg_fn_ctxs, MemPool* pool, + Status* status) noexcept; /// Returns the number of bytes of variable-length data for the grouping values stored /// in 'ht_ctx_'. @@ -477,7 +476,7 @@ class PartitionedAggregationNode : public ExecNode { /// This function is replaced by codegen (which is why we don't use a vector argument /// for agg_fn_ctxs).. Any var-len data is allocated from the FunctionContexts. void UpdateTuple(impala_udf::FunctionContext** agg_fn_ctxs, Tuple* tuple, TupleRow* row, - bool is_merge = false); + bool is_merge = false) noexcept; /// Called on the intermediate tuple of each group after all input rows have been /// consumed and aggregated. Computes the final aggregate values to be returned in http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-builder-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc index 21fd9e4..11974e3 100644 --- a/be/src/exec/partitioned-hash-join-builder-ir.cc +++ b/be/src/exec/partitioned-hash-join-builder-ir.cc @@ -29,15 +29,16 @@ using namespace impala; -inline Status PhjBuilder::AppendRow(BufferedTupleStream* stream, TupleRow* row) { - Status status; - if (LIKELY(stream->AddRow(row, &status))) return Status::OK(); - RETURN_IF_ERROR(status); - return AppendRowStreamFull(stream, row); +inline bool PhjBuilder::AppendRow( + BufferedTupleStream* stream, TupleRow* row, Status* status) { + if (LIKELY(stream->AddRow(row, status))) return true; + if (UNLIKELY(!status->ok())) return false; + return AppendRowStreamFull(stream, row, status); } Status PhjBuilder::ProcessBuildBatch( RowBatch* build_batch, HashTableCtx* ctx, bool build_filters) { + Status status; HashTableCtx::ExprValuesCache* expr_vals_cache = ctx->expr_values_cache(); expr_vals_cache->Reset(); FOREACH_ROW(build_batch, 0, build_batch_iter) { @@ -47,7 +48,10 @@ Status PhjBuilder::ProcessBuildBatch( // TODO: remove with codegen/template // If we are NULL aware and this build row has NULL in the eq join slot, // append it to the null_aware partition. We will need it later. - RETURN_IF_ERROR(AppendRow(null_aware_partition_->build_rows(), build_row)); + if (UNLIKELY( + !AppendRow(null_aware_partition_->build_rows(), build_row, &status))) { + return std::move(status); + } } continue; } @@ -66,7 +70,9 @@ Status PhjBuilder::ProcessBuildBatch( const uint32_t hash = expr_vals_cache->CurExprValuesHash(); const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS); Partition* partition = hash_partitions_[partition_idx]; - RETURN_IF_ERROR(AppendRow(partition->build_rows(), build_row)); + if (UNLIKELY(!AppendRow(partition->build_rows(), build_row, &status))) { + return std::move(status); + } } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-builder.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc index b17bbff..bf5b42a 100644 --- a/be/src/exec/partitioned-hash-join-builder.cc +++ b/be/src/exec/partitioned-hash-join-builder.cc @@ -260,23 +260,26 @@ Status PhjBuilder::CreateHashPartitions(int level) { return Status::OK(); } -Status PhjBuilder::AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row) { - Status status; +bool PhjBuilder::AppendRowStreamFull( + BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept { while (true) { // Check if the stream is still using small buffers and try to switch to IO-buffers. if (stream->using_small_buffers()) { bool got_buffer; - RETURN_IF_ERROR(stream->SwitchToIoBuffers(&got_buffer)); + *status = stream->SwitchToIoBuffers(&got_buffer); + if (!status->ok()) return false; + if (got_buffer) { - if (LIKELY(stream->AddRow(row, &status))) return Status::OK(); - RETURN_IF_ERROR(status); + if (LIKELY(stream->AddRow(row, status))) return true; + if (!status->ok()) return false; } } // We ran out of memory. Pick a partition to spill. If we ran out of unspilled // partitions, SpillPartition() will return an error status. - RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT)); - if (stream->AddRow(row, &status)) return Status::OK(); - RETURN_IF_ERROR(status); + *status = SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT); + if (!status->ok()) return false; + if (stream->AddRow(row, status)) return true; + if (!status->ok()) return false; // Spilling one partition does not guarantee we can append a row. Keep // spilling until we can append this row. } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-builder.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h index 23822b2..7f81e5a 100644 --- a/be/src/exec/partitioned-hash-join-builder.h +++ b/be/src/exec/partitioned-hash-join-builder.h @@ -261,14 +261,18 @@ class PhjBuilder : public DataSink { /// Append 'row' to 'stream'. In the common case, appending the row to the stream /// immediately succeeds. Otherwise this function falls back to the slower path of - /// AppendRowStreamFull(), which may spill partitions to free memory. Returns an error - /// if it was unable to append the row, even after spilling partitions. - Status AppendRow(BufferedTupleStream* stream, TupleRow* row); + /// AppendRowStreamFull(), which may spill partitions to free memory. Returns false + /// and sets 'status' if it was unable to append the row, even after spilling + /// partitions. This odd return convention is used to avoid emitting unnecessary code + /// for ~Status in perf-critical code. + bool AppendRow(BufferedTupleStream* stream, TupleRow* row, Status* status); /// Slow path for AppendRow() above. It is called when the stream has failed to append /// the row. We need to find more memory by either switching to IO-buffers, in case the - /// stream still uses small buffers, or spilling a partition. - Status AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row); + /// stream still uses small buffers, or spilling a partition. Returns false and sets + /// 'status' if it was unable to append the row, even after spilling partitions. + bool AppendRowStreamFull( + BufferedTupleStream* stream, TupleRow* row, Status* status) noexcept; /// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed /// to the Spill() call for the selected partition. The current policy is to spill the http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-node-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc index 44cb14b..bbed06d 100644 --- a/be/src/exec/partitioned-hash-join-node-ir.cc +++ b/be/src/exec/partitioned-hash-join-node-ir.cc @@ -149,10 +149,11 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins( // build side. For those rows, we need to process the remaining join // predicates later. if (builder_->null_aware_partition()->build_rows()->num_rows() != 0) { - if (num_other_join_conjuncts > 0) { - *status = AppendProbeRow( - null_aware_probe_partition_->probe_rows(), current_probe_row_); - if (UNLIKELY(!status->ok())) return false; + if (num_other_join_conjuncts > 0 + && UNLIKELY(!AppendProbeRow(null_aware_probe_partition_->probe_rows(), + current_probe_row_, status))) { + DCHECK(!status->ok()); + return false; } return true; } @@ -217,7 +218,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins( return true; } -template<int const JoinOp> +template <int const JoinOp> bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRow( ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts, ExprContext* const* conjunct_ctxs, int num_conjuncts, @@ -282,8 +283,11 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow( skip_row = true; } else { // Condition 3 above. - *status = AppendProbeRow(null_probe_rows_.get(), current_probe_row_); - if (UNLIKELY(!status->ok())) return false; + if (UNLIKELY( + !AppendProbeRow(null_probe_rows_.get(), current_probe_row_, status))) { + DCHECK(!status->ok()); + return false; + } matched_null_probe_.push_back(false); skip_row = true; } @@ -306,8 +310,10 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow( // Skip the current row if we manage to append to the spilled partition's BTS. // Otherwise, we need to bail out and report the failure. BufferedTupleStream* probe_rows = probe_partition->probe_rows(); - *status = AppendProbeRow(probe_rows, current_probe_row_); - if (UNLIKELY(!status->ok())) return false; + if (UNLIKELY(!AppendProbeRow(probe_rows, current_probe_row_, status))) { + DCHECK(!status->ok()); + return false; + } skip_row = true; } } @@ -426,15 +432,12 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode return num_rows_added; } -inline Status PartitionedHashJoinNode::AppendProbeRow( - BufferedTupleStream* stream, TupleRow* row) { +inline bool PartitionedHashJoinNode::AppendProbeRow( + BufferedTupleStream* stream, TupleRow* row, Status* status) { DCHECK(stream->has_write_block()); DCHECK(!stream->using_small_buffers()); DCHECK(!stream->is_pinned()); - Status status; - if (LIKELY(stream->AddRow(row, &status))) return Status::OK(); - DCHECK(!status.ok()); - return status; + return stream->AddRow(row, status); } template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::INNER_JOIN>( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/exec/partitioned-hash-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index 9827788..5b9264c 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -166,7 +166,10 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// Append the probe row 'row' to 'stream'. The stream must be unpinned and must have /// a write buffer allocated, so this will succeed unless an error is encountered. - Status AppendProbeRow(BufferedTupleStream* stream, TupleRow* row); + /// Returns false and sets 'status' to an error if an error is encountered. This odd + /// return convention is used to avoid emitting unnecessary code for ~Status in perf- + /// critical code. + bool AppendProbeRow(BufferedTupleStream* stream, TupleRow* row, Status* status); /// Probes the hash table for rows matching the current probe row and appends /// all the matching build rows (with probe row) to output batch. Returns true @@ -267,8 +270,8 @@ class PartitionedHashJoinNode : public BlockingJoinNode { /// probe_batch_ is entirely consumed. /// For RIGHT_ANTI_JOIN, all this function does is to mark whether each build row /// had a match. - /// Returns the number of rows added to out_batch; -1 on error (and *status will be - /// set). This function doesn't commit rows to the output batch so it's the caller's + /// Returns the number of rows added to out_batch; -1 on error (and *status will + /// be set). This function doesn't commit rows to the output batch so it's the caller's /// responsibility to do so. template<int const JoinOp> int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch, HashTableCtx* ht_ctx, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/buffered-tuple-stream.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc index b2fce45..e3b3b4a 100644 --- a/be/src/runtime/buffered-tuple-stream.cc +++ b/be/src/runtime/buffered-tuple-stream.cc @@ -222,8 +222,8 @@ Status BufferedTupleStream::UnpinBlock(BufferedBlockMgr::Block* block) { return Status::OK(); } -Status BufferedTupleStream::NewWriteBlock(int64_t block_len, int64_t null_indicators_size, - bool* got_block) { +Status BufferedTupleStream::NewWriteBlock( + int64_t block_len, int64_t null_indicators_size, bool* got_block) noexcept { DCHECK(!closed_); DCHECK_GE(null_indicators_size, 0); *got_block = false; @@ -282,7 +282,8 @@ Status BufferedTupleStream::NewWriteBlock(int64_t block_len, int64_t null_indica return Status::OK(); } -Status BufferedTupleStream::NewWriteBlockForRow(int64_t row_size, bool* got_block) { +Status BufferedTupleStream::NewWriteBlockForRow( + int64_t row_size, bool* got_block) noexcept { int64_t block_len; int64_t null_indicators_size; if (use_small_buffers_) { @@ -694,7 +695,7 @@ void BufferedTupleStream::FixUpCollectionsForRead(const vector<SlotDescriptor*>& } } -int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const { +int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const noexcept { int64_t size = 0; if (has_nullable_tuple_) { for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) { @@ -733,7 +734,15 @@ int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const { return size; } -bool BufferedTupleStream::DeepCopy(TupleRow* row) { +bool BufferedTupleStream::AddRowSlow(TupleRow* row, Status* status) noexcept { + bool got_block; + int64_t row_size = ComputeRowSize(row); + *status = NewWriteBlockForRow(row_size, &got_block); + if (!status->ok() || !got_block) return false; + return DeepCopy(row); +} + +bool BufferedTupleStream::DeepCopy(TupleRow* row) noexcept { if (has_nullable_tuple_) { return DeepCopyInternal<true>(row); } else { @@ -744,7 +753,7 @@ bool BufferedTupleStream::DeepCopy(TupleRow* row) { // TODO: this really needs codegen // TODO: in case of duplicate tuples, this can redundantly serialize data. template <bool HasNullableTuple> -bool BufferedTupleStream::DeepCopyInternal(TupleRow* row) { +bool BufferedTupleStream::DeepCopyInternal(TupleRow* row) noexcept { if (UNLIKELY(write_block_ == NULL)) return false; DCHECK_GE(write_block_null_indicators_size_, 0); DCHECK(write_block_->is_pinned()) << DebugString() << std::endl http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/buffered-tuple-stream.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h index d3bfa81..d138150 100644 --- a/be/src/runtime/buffered-tuple-stream.h +++ b/be/src/runtime/buffered-tuple-stream.h @@ -238,15 +238,17 @@ class BufferedTupleStream { /// Must be called for streams using small buffers to switch to IO-sized buffers. /// If it fails to get a buffer (i.e. the switch fails) it resets the use_small_buffers_ /// back to false. - /// TODO: this does not seem like the best mechanism. + /// TODO: IMPALA-3200: remove this when small buffers are removed. Status SwitchToIoBuffers(bool* got_buffer); - /// Adds a single row to the stream. Returns false and sets *status if an error - /// occurred. BufferedTupleStream will do a deep copy of the memory in the row. - /// After AddRow returns false, it should not be called again, unless - /// using_small_buffers_ is true, in which case it is valid to call SwitchToIoBuffers() - /// then AddRow() again. - bool AddRow(TupleRow* row, Status* status); + /// Adds a single row to the stream. Returns true if the append succeeded, returns false + /// and sets 'status' to OK if appending failed but can be retried or returns false and + /// sets 'status' to an error if an error occurred. + /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow() + /// returns an error, it should not be called again. If appending failed without an + /// error and the stream is using small buffers, it is valid to call + /// SwitchToIoBuffers() then AddRow() again. + bool AddRow(TupleRow* row, Status* status) noexcept; /// Allocates space to store a row of with fixed length 'fixed_size' and variable /// length data 'varlen_size'. If successful, returns the pointer where fixed length @@ -458,11 +460,15 @@ class BufferedTupleStream { RuntimeProfile::Counter* unpin_timer_; RuntimeProfile::Counter* get_new_block_timer_; + /// The slow path for AddRow() that is called if there is not sufficient space in + /// the current block. + bool AddRowSlow(TupleRow* row, Status* status) noexcept; + /// Copies 'row' into write_block_. Returns false if there is not enough space in /// 'write_block_'. After returning false, write_ptr_ may be left pointing to the /// partially-written row, and no more data can be written to write_block_. template <bool HAS_NULLABLE_TUPLE> - bool DeepCopyInternal(TupleRow* row); + bool DeepCopyInternal(TupleRow* row) noexcept; /// Helper function to copy strings in string_slots from tuple into write_block_. /// Updates write_ptr_ to the end of the string data added. Returns false if the data @@ -480,7 +486,7 @@ class BufferedTupleStream { const std::vector<SlotDescriptor*>& collection_slots); /// Wrapper of the templated DeepCopyInternal() function. - bool DeepCopy(TupleRow* row); + bool DeepCopy(TupleRow* row) noexcept; /// Gets a new block of 'block_len' bytes from the block_mgr_, updating write_block_, /// write_tuple_idx_, write_ptr_ and write_end_ptr_. 'null_indicators_size' is the @@ -488,12 +494,13 @@ class BufferedTupleStream { /// *got_block is set to true if a block was successfully acquired. Null indicators /// (if any) will also be reserved and initialized. If there are no blocks available, /// *got_block is set to false and write_block_ is unchanged. - Status NewWriteBlock(int64_t block_len, int64_t null_indicators_size, bool* got_block); + Status NewWriteBlock( + int64_t block_len, int64_t null_indicators_size, bool* got_block) noexcept; /// A wrapper around NewWriteBlock(). 'row_size' is the size of the tuple row to be /// appended to this block. This function determines the block size required in order /// to fit the row and null indicators. - Status NewWriteBlockForRow(int64_t row_size, bool* got_block); + Status NewWriteBlockForRow(int64_t row_size, bool* got_block) noexcept; /// Reads the next block from the block_mgr_. This blocks if necessary. /// Updates read_block_, read_ptr_, read_tuple_idx_ and read_end_ptr_. @@ -502,7 +509,7 @@ class BufferedTupleStream { /// Returns the total additional bytes that this row will consume in write_block_ if /// appended to the block. This includes the fixed length part of the row and the /// data for inlined_string_slots_ and inlined_coll_slots_. - int64_t ComputeRowSize(TupleRow* row) const; + int64_t ComputeRowSize(TupleRow* row) const noexcept; /// Unpins block if it is an IO-sized block and updates tracking stats. Status UnpinBlock(BufferedBlockMgr::Block* block); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/buffered-tuple-stream.inline.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.inline.h b/be/src/runtime/buffered-tuple-stream.inline.h index 7a2f247..ba6bb8c 100644 --- a/be/src/runtime/buffered-tuple-stream.inline.h +++ b/be/src/runtime/buffered-tuple-stream.inline.h @@ -25,14 +25,10 @@ namespace impala { -inline bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) { +inline bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) noexcept { DCHECK(!closed_); if (LIKELY(DeepCopy(row))) return true; - bool got_block; - int64_t row_size = ComputeRowSize(row); - *status = NewWriteBlockForRow(row_size, &got_block); - if (!status->ok() || !got_block) return false; - return DeepCopy(row); + return AddRowSlow(row, status); } inline uint8_t* BufferedTupleStream::AllocateRow(int fixed_size, int varlen_size, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/raw-value.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/raw-value.cc b/be/src/runtime/raw-value.cc index 7247b8e..aef58f2 100644 --- a/be/src/runtime/raw-value.cc +++ b/be/src/runtime/raw-value.cc @@ -191,4 +191,173 @@ void RawValue::Write(const void* value, Tuple* tuple, const SlotDescriptor* slot } } +uint32_t RawValue::GetHashValue( + const void* v, const ColumnType& type, uint32_t seed) noexcept { + // The choice of hash function needs to be consistent across all hosts of the cluster. + + // Use HashCombine with arbitrary constant to ensure we don't return seed. + if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed); + + switch (type.type) { + case TYPE_CHAR: + case TYPE_STRING: + case TYPE_VARCHAR: + return RawValue::GetHashValueNonNull<impala::StringValue>( + reinterpret_cast<const StringValue*>(v), type, seed); + case TYPE_BOOLEAN: + return RawValue::GetHashValueNonNull<bool>( + reinterpret_cast<const bool*>(v), type, seed); + case TYPE_TINYINT: + return RawValue::GetHashValueNonNull<int8_t>( + reinterpret_cast<const int8_t*>(v), type, seed); + case TYPE_SMALLINT: + return RawValue::GetHashValueNonNull<int16_t>( + reinterpret_cast<const int16_t*>(v), type, seed); + case TYPE_INT: + return RawValue::GetHashValueNonNull<int32_t>( + reinterpret_cast<const int32_t*>(v), type, seed); + case TYPE_BIGINT: + return RawValue::GetHashValueNonNull<int64_t>( + reinterpret_cast<const int64_t*>(v), type, seed); + case TYPE_FLOAT: + return RawValue::GetHashValueNonNull<float>( + reinterpret_cast<const float*>(v), type, seed); + case TYPE_DOUBLE: + return RawValue::GetHashValueNonNull<double>( + reinterpret_cast<const double*>(v), type, seed); + case TYPE_TIMESTAMP: + return RawValue::GetHashValueNonNull<TimestampValue>( + reinterpret_cast<const TimestampValue*>(v), type, seed); + case TYPE_DECIMAL: + switch (type.GetByteSize()) { + case 4: + return RawValue::GetHashValueNonNull<Decimal4Value>( + reinterpret_cast<const impala::Decimal4Value*>(v), type, seed); + case 8: + return RawValue::GetHashValueNonNull<Decimal8Value>( + reinterpret_cast<const Decimal8Value*>(v), type, seed); + case 16: + return RawValue::GetHashValueNonNull<Decimal16Value>( + reinterpret_cast<const Decimal16Value*>(v), type, seed); + DCHECK(false); + } + default: DCHECK(false); return 0; + } +} + +uint32_t RawValue::GetHashValueFnv(const void* v, const ColumnType& type, uint32_t seed) { + // Use HashCombine with arbitrary constant to ensure we don't return seed. + if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed); + + switch (type.type) { + case TYPE_STRING: + case TYPE_VARCHAR: { + const StringValue* string_value = reinterpret_cast<const StringValue*>(v); + if (string_value->len == 0) { + return HashUtil::HashCombine32(HASH_VAL_EMPTY, seed); + } + return HashUtil::FnvHash64to32(string_value->ptr, string_value->len, seed); + } + case TYPE_BOOLEAN: + return HashUtil::HashCombine32(*reinterpret_cast<const bool*>(v), seed); + case TYPE_TINYINT: return HashUtil::FnvHash64to32(v, 1, seed); + case TYPE_SMALLINT: return HashUtil::FnvHash64to32(v, 2, seed); + case TYPE_INT: return HashUtil::FnvHash64to32(v, 4, seed); + case TYPE_BIGINT: return HashUtil::FnvHash64to32(v, 8, seed); + case TYPE_FLOAT: return HashUtil::FnvHash64to32(v, 4, seed); + case TYPE_DOUBLE: return HashUtil::FnvHash64to32(v, 8, seed); + case TYPE_TIMESTAMP: return HashUtil::FnvHash64to32(v, 12, seed); + case TYPE_CHAR: + return HashUtil::FnvHash64to32(StringValue::CharSlotToPtr(v, type), type.len, seed); + case TYPE_DECIMAL: return HashUtil::FnvHash64to32(v, type.GetByteSize(), seed); + default: DCHECK(false); return 0; + } +} + +void RawValue::PrintValue( + const void* value, const ColumnType& type, int scale, std::stringstream* stream) { + if (value == NULL) { + *stream << "NULL"; + return; + } + + int old_precision = stream->precision(); + std::ios_base::fmtflags old_flags = stream->flags(); + if (scale > -1) { + stream->precision(scale); + // Setting 'fixed' causes precision to set the number of digits printed after the + // decimal (by default it sets the maximum number of digits total). + *stream << std::fixed; + } + + const StringValue* string_val = NULL; + switch (type.type) { + case TYPE_BOOLEAN: { + bool val = *reinterpret_cast<const bool*>(value); + *stream << (val ? "true" : "false"); + return; + } + case TYPE_TINYINT: + // Extra casting for chars since they should not be interpreted as ASCII. + *stream << static_cast<int>(*reinterpret_cast<const int8_t*>(value)); + break; + case TYPE_SMALLINT: *stream << *reinterpret_cast<const int16_t*>(value); break; + case TYPE_INT: *stream << *reinterpret_cast<const int32_t*>(value); break; + case TYPE_BIGINT: *stream << *reinterpret_cast<const int64_t*>(value); break; + case TYPE_FLOAT: { + float val = *reinterpret_cast<const float*>(value); + if (LIKELY(std::isfinite(val))) { + *stream << val; + } else if (std::isinf(val)) { + // 'Infinity' is Java's text representation of inf. By staying close to Java, we + // allow Hive to read text tables containing non-finite values produced by + // Impala. (The same logic applies to 'NaN', below). + *stream << (val < 0 ? "-Infinity" : "Infinity"); + } else if (std::isnan(val)) { + *stream << "NaN"; + } + } break; + case TYPE_DOUBLE: { + double val = *reinterpret_cast<const double*>(value); + if (LIKELY(std::isfinite(val))) { + *stream << val; + } else if (std::isinf(val)) { + // See TYPE_FLOAT for rationale. + *stream << (val < 0 ? "-Infinity" : "Infinity"); + } else if (std::isnan(val)) { + *stream << "NaN"; + } + } break; + case TYPE_VARCHAR: + case TYPE_STRING: + string_val = reinterpret_cast<const StringValue*>(value); + if (type.type == TYPE_VARCHAR) DCHECK(string_val->len <= type.len); + stream->write(string_val->ptr, string_val->len); + break; + case TYPE_TIMESTAMP: + *stream << *reinterpret_cast<const TimestampValue*>(value); + break; + case TYPE_CHAR: + stream->write(StringValue::CharSlotToPtr(value, type), type.len); + break; + case TYPE_DECIMAL: + switch (type.GetByteSize()) { + case 4: + *stream << reinterpret_cast<const Decimal4Value*>(value)->ToString(type); + break; + case 8: + *stream << reinterpret_cast<const Decimal8Value*>(value)->ToString(type); + break; + case 16: + *stream << reinterpret_cast<const Decimal16Value*>(value)->ToString(type); + break; + default: DCHECK(false) << type; + } + break; + default: DCHECK(false); + } + stream->precision(old_precision); + // Undo setting stream to fixed + stream->flags(old_flags); +} } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/raw-value.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/raw-value.h b/be/src/runtime/raw-value.h index bc76b2c..5ec8ed1 100644 --- a/be/src/runtime/raw-value.h +++ b/be/src/runtime/raw-value.h @@ -54,8 +54,8 @@ class RawValue { /// Returns hash value for 'v' interpreted as 'type'. The resulting hash value /// is combined with the seed value. - static inline uint32_t GetHashValue(const void* v, const ColumnType& type, - uint32_t seed = 0); + static uint32_t GetHashValue( + const void* v, const ColumnType& type, uint32_t seed = 0) noexcept; /// Templatized version of GetHashValue, use if type is known ahead. GetHashValue /// handles nulls. @@ -74,8 +74,7 @@ class RawValue { /// GetHashValue() does not have this property and cannot be safely used as the first /// step in data repartitioning. However, GetHashValue() can be significantly faster. /// TODO: fix GetHashValue - static inline uint32_t GetHashValueFnv(const void* v, const ColumnType& type, - uint32_t seed); + static uint32_t GetHashValueFnv(const void* v, const ColumnType& type, uint32_t seed); /// Compares both values. /// Return value is < 0 if v1 < v2, 0 if v1 == v2, > 0 if v1 > v2. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/runtime/raw-value.inline.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/raw-value.inline.h b/be/src/runtime/raw-value.inline.h index a1f1d75..63c9a07 100644 --- a/be/src/runtime/raw-value.inline.h +++ b/be/src/runtime/raw-value.inline.h @@ -211,194 +211,6 @@ inline uint32_t RawValue::GetHashValue(const T* v, const ColumnType& type, if (UNLIKELY(v == NULL)) return HashUtil::HashCombine32(HASH_VAL_NULL, seed); return RawValue::GetHashValueNonNull<T>(v, type, seed); } - -inline uint32_t RawValue::GetHashValue(const void* v, const ColumnType& type, - uint32_t seed) { - //The choice of hash function needs to be consistent across all hosts of the cluster. - - // Use HashCombine with arbitrary constant to ensure we don't return seed. - if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed); - - switch (type.type) { - case TYPE_CHAR: - case TYPE_STRING: - case TYPE_VARCHAR: - return RawValue::GetHashValueNonNull<impala::StringValue>( - reinterpret_cast<const StringValue*>(v), type, seed); - case TYPE_BOOLEAN: - return RawValue::GetHashValueNonNull<bool>( - reinterpret_cast<const bool*>(v), type, seed); - case TYPE_TINYINT: - return RawValue::GetHashValueNonNull<int8_t>( - reinterpret_cast<const int8_t*>(v), type, seed); - case TYPE_SMALLINT: - return RawValue::GetHashValueNonNull<int16_t>( - reinterpret_cast<const int16_t*>(v), type, seed); - case TYPE_INT: - return RawValue::GetHashValueNonNull<int32_t>( - reinterpret_cast<const int32_t*>(v), type, seed); - case TYPE_BIGINT: - return RawValue::GetHashValueNonNull<int64_t>( - reinterpret_cast<const int64_t*>(v), type, seed); - case TYPE_FLOAT: - return RawValue::GetHashValueNonNull<float>( - reinterpret_cast<const float*>(v), type, seed); - case TYPE_DOUBLE: - return RawValue::GetHashValueNonNull<double>( - reinterpret_cast<const double*>(v), type, seed); - case TYPE_TIMESTAMP: - return RawValue::GetHashValueNonNull<TimestampValue>( - reinterpret_cast<const TimestampValue*>(v), type, seed); - case TYPE_DECIMAL: - switch(type.GetByteSize()) { - case 4: return - RawValue::GetHashValueNonNull<Decimal4Value>( - reinterpret_cast<const impala::Decimal4Value*>(v), type, seed); - case 8: - return RawValue::GetHashValueNonNull<Decimal8Value>( - reinterpret_cast<const Decimal8Value*>(v), type, seed); - case 16: - return RawValue::GetHashValueNonNull<Decimal16Value>( - reinterpret_cast<const Decimal16Value*>(v), type, seed); - DCHECK(false); - } - default: - DCHECK(false); - return 0; - } -} - -inline uint32_t RawValue::GetHashValueFnv(const void* v, const ColumnType& type, - uint32_t seed) { - // Use HashCombine with arbitrary constant to ensure we don't return seed. - if (v == NULL) return HashUtil::HashCombine32(HASH_VAL_NULL, seed); - - switch (type.type ) { - case TYPE_STRING: - case TYPE_VARCHAR: { - const StringValue* string_value = reinterpret_cast<const StringValue*>(v); - if (string_value->len == 0) { - return HashUtil::HashCombine32(HASH_VAL_EMPTY, seed); - } - return HashUtil::FnvHash64to32(string_value->ptr, string_value->len, seed); - } - case TYPE_BOOLEAN: - return HashUtil::HashCombine32(*reinterpret_cast<const bool*>(v), seed); - case TYPE_TINYINT: return HashUtil::FnvHash64to32(v, 1, seed); - case TYPE_SMALLINT: return HashUtil::FnvHash64to32(v, 2, seed); - case TYPE_INT: return HashUtil::FnvHash64to32(v, 4, seed); - case TYPE_BIGINT: return HashUtil::FnvHash64to32(v, 8, seed); - case TYPE_FLOAT: return HashUtil::FnvHash64to32(v, 4, seed); - case TYPE_DOUBLE: return HashUtil::FnvHash64to32(v, 8, seed); - case TYPE_TIMESTAMP: return HashUtil::FnvHash64to32(v, 12, seed); - case TYPE_CHAR: return HashUtil::FnvHash64to32(StringValue::CharSlotToPtr(v, type), - type.len, seed); - case TYPE_DECIMAL: return HashUtil::FnvHash64to32(v, type.GetByteSize(), seed); - default: - DCHECK(false); - return 0; - } -} - -inline void RawValue::PrintValue(const void* value, const ColumnType& type, int scale, - std::stringstream* stream) { - if (value == NULL) { - *stream << "NULL"; - return; - } - - int old_precision = stream->precision(); - std::ios_base::fmtflags old_flags = stream->flags(); - if (scale > -1) { - stream->precision(scale); - // Setting 'fixed' causes precision to set the number of digits printed after the - // decimal (by default it sets the maximum number of digits total). - *stream << std::fixed; - } - - const StringValue* string_val = NULL; - switch (type.type) { - case TYPE_BOOLEAN: { - bool val = *reinterpret_cast<const bool*>(value); - *stream << (val ? "true" : "false"); - return; - } - case TYPE_TINYINT: - // Extra casting for chars since they should not be interpreted as ASCII. - *stream << static_cast<int>(*reinterpret_cast<const int8_t*>(value)); - break; - case TYPE_SMALLINT: - *stream << *reinterpret_cast<const int16_t*>(value); - break; - case TYPE_INT: - *stream << *reinterpret_cast<const int32_t*>(value); - break; - case TYPE_BIGINT: - *stream << *reinterpret_cast<const int64_t*>(value); - break; - case TYPE_FLOAT: - { - float val = *reinterpret_cast<const float*>(value); - if (LIKELY(std::isfinite(val))) { - *stream << val; - } else if (std::isinf(val)) { - // 'Infinity' is Java's text representation of inf. By staying close to Java, we - // allow Hive to read text tables containing non-finite values produced by - // Impala. (The same logic applies to 'NaN', below). - *stream << (val < 0 ? "-Infinity" : "Infinity"); - } else if (std::isnan(val)) { - *stream << "NaN"; - } - } - break; - case TYPE_DOUBLE: - { - double val = *reinterpret_cast<const double*>(value); - if (LIKELY(std::isfinite(val))) { - *stream << val; - } else if (std::isinf(val)) { - // See TYPE_FLOAT for rationale. - *stream << (val < 0 ? "-Infinity" : "Infinity"); - } else if (std::isnan(val)) { - *stream << "NaN"; - } - } - break; - case TYPE_VARCHAR: - case TYPE_STRING: - string_val = reinterpret_cast<const StringValue*>(value); - if (type.type == TYPE_VARCHAR) DCHECK(string_val->len <= type.len); - stream->write(string_val->ptr, string_val->len); - break; - case TYPE_TIMESTAMP: - *stream << *reinterpret_cast<const TimestampValue*>(value); - break; - case TYPE_CHAR: - stream->write(StringValue::CharSlotToPtr(value, type), type.len); - break; - case TYPE_DECIMAL: - switch (type.GetByteSize()) { - case 4: - *stream << reinterpret_cast<const Decimal4Value*>(value)->ToString(type); - break; - case 8: - *stream << reinterpret_cast<const Decimal8Value*>(value)->ToString(type); - break; - case 16: - *stream << reinterpret_cast<const Decimal16Value*>(value)->ToString(type); - break; - default: - DCHECK(false) << type; - } - break; - default: - DCHECK(false); - } - stream->precision(old_precision); - // Undo setting stream to fixed - stream->flags(old_flags); -} - } #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/util/bloom-filter.cc ---------------------------------------------------------------------- diff --git a/be/src/util/bloom-filter.cc b/be/src/util/bloom-filter.cc index 7d8c8f7..6fd53f5 100644 --- a/be/src/util/bloom-filter.cc +++ b/be/src/util/bloom-filter.cc @@ -83,6 +83,78 @@ void BloomFilter::ToThrift(const BloomFilter* filter, TBloomFilter* thrift) { filter->ToThrift(thrift); } +// The SIMD reinterpret_casts technically violate C++'s strict aliasing rules. However, we +// compile with -fno-strict-aliasing. + +void BloomFilter::BucketInsert(const uint32_t bucket_idx, const uint32_t hash) { + // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit word. It is + // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads in the second + // part of this method. + uint32_t new_bucket[8] __attribute__((aligned(16))); + for (int i = 0; i < 8; ++i) { + // Rehash 'hash' and use the top LOG_BUCKET_WORD_BITS bits, following Dietzfelbinger. + new_bucket[i] = + (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - LOG_BUCKET_WORD_BITS); + new_bucket[i] = 1U << new_bucket[i]; + } + for (int i = 0; i < 2; ++i) { + __m128i new_bucket_sse = + _mm_load_si128(reinterpret_cast<__m128i*>(new_bucket + 4 * i)); + __m128i* existing_bucket = reinterpret_cast<__m128i*>(&directory_[bucket_idx][4 * i]); + *existing_bucket = _mm_or_si128(*existing_bucket, new_bucket_sse); + } +} + +__m256i BloomFilter::MakeMask(const uint32_t hash) { + const __m256i ones = _mm256_set1_epi32(1); + const __m256i rehash = _mm256_setr_epi32(IMPALA_BLOOM_HASH_CONSTANTS); + // Load hash into a YMM register, repeated eight times + __m256i hash_data = _mm256_set1_epi32(hash); + // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by eight different + // odd constants, then keep the 5 most significant bits from each product. + hash_data = _mm256_mullo_epi32(rehash, hash_data); + hash_data = _mm256_srli_epi32(hash_data, 27); + // Use these 5 bits to shift a single bit to a location in each 32-bit lane + return _mm256_sllv_epi32(ones, hash_data); +} + +void BloomFilter::BucketInsertAVX2( + const uint32_t bucket_idx, const uint32_t hash) { + const __m256i mask = MakeMask(hash); + __m256i* const bucket = &reinterpret_cast<__m256i*>(directory_)[bucket_idx]; + _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask)); + // For SSE compatibility, unset the high bits of each YMM register so SSE instructions + // dont have to save them off before using XMM registers. + _mm256_zeroupper(); +} + +bool BloomFilter::BucketFindAVX2( + const uint32_t bucket_idx, const uint32_t hash) const { + const __m256i mask = MakeMask(hash); + const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx]; + // We should return true if 'bucket' has a one wherever 'mask' does. _mm256_testc_si256 + // takes the negation of its first argument and ands that with its second argument. In + // our case, the result is zero everywhere iff there is a one in 'bucket' wherever + // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 0 otherwise. + const bool result = _mm256_testc_si256(bucket, mask); + _mm256_zeroupper(); + return result; +} + +bool BloomFilter::BucketFind( + const uint32_t bucket_idx, const uint32_t hash) const { + for (int i = 0; i < BUCKET_WORDS; ++i) { + BucketWord hval = + (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - LOG_BUCKET_WORD_BITS); + hval = 1U << hval; + if (!(directory_[bucket_idx][i] & hval)) { + return false; + } + } + return true; +} + + void BloomFilter::Or(const TBloomFilter& in, TBloomFilter* out) { DCHECK(out != NULL); DCHECK_EQ(in.log_heap_space, out->log_heap_space); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c7fe4385/be/src/util/bloom-filter.h ---------------------------------------------------------------------- diff --git a/be/src/util/bloom-filter.h b/be/src/util/bloom-filter.h index 7a94995..4342814 100644 --- a/be/src/util/bloom-filter.h +++ b/be/src/util/bloom-filter.h @@ -25,9 +25,9 @@ #include <immintrin.h> -#include "gutil/macros.h" - +#include "common/compiler-util.h" #include "gen-cpp/ImpalaInternalService_types.h" +#include "gutil/macros.h" #include "runtime/buffered-block-mgr.h" namespace impala { @@ -173,7 +173,7 @@ class BloomFilter { // the advantage of requiring fewer random bits: log2(32) * 8 = 5 * 8 = 40 random bits for // a split Bloom filter, but log2(256) * 8 = 64 random bits for a standard Bloom filter. -inline void BloomFilter::Insert(const uint32_t hash) { +inline void ALWAYS_INLINE BloomFilter::Insert(const uint32_t hash) { const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_; if (CpuInfo::IsSupported(CpuInfo::AVX2)) { BucketInsertAVX2(bucket_idx, hash); @@ -182,7 +182,7 @@ inline void BloomFilter::Insert(const uint32_t hash) { } } -inline bool BloomFilter::Find(const uint32_t hash) const { +inline bool ALWAYS_INLINE BloomFilter::Find(const uint32_t hash) const { const uint32_t bucket_idx = HashUtil::Rehash32to32(hash) & directory_mask_; if (CpuInfo::IsSupported(CpuInfo::AVX2)) { return BucketFindAVX2(bucket_idx, hash); @@ -191,78 +191,6 @@ inline bool BloomFilter::Find(const uint32_t hash) const { } } -// The SIMD reinterpret_casts technically violate C++'s strict aliasing rules. However, we -// compile with -fno-strict-aliasing. - -inline void BloomFilter::BucketInsert(const uint32_t bucket_idx, const uint32_t hash) { - // new_bucket will be all zeros except for eight 1-bits, one in each 32-bit word. It is - // 16-byte aligned so it can be read as a __m128i using aligned SIMD loads in the second - // part of this method. - uint32_t new_bucket[8] __attribute__((aligned(16))); - for (int i = 0; i < 8; ++i) { - // Rehash 'hash' and use the top LOG_BUCKET_WORD_BITS bits, following Dietzfelbinger. - new_bucket[i] = - (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - LOG_BUCKET_WORD_BITS); - new_bucket[i] = 1U << new_bucket[i]; - } - for (int i = 0; i < 2; ++i) { - __m128i new_bucket_sse = - _mm_load_si128(reinterpret_cast<__m128i*>(new_bucket + 4 * i)); - __m128i* existing_bucket = reinterpret_cast<__m128i*>(&directory_[bucket_idx][4 * i]); - *existing_bucket = _mm_or_si128(*existing_bucket, new_bucket_sse); - } -} - -inline __m256i BloomFilter::MakeMask(const uint32_t hash) { - const __m256i ones = _mm256_set1_epi32(1); - const __m256i rehash = _mm256_setr_epi32(IMPALA_BLOOM_HASH_CONSTANTS); - // Load hash into a YMM register, repeated eight times - __m256i hash_data = _mm256_set1_epi32(hash); - // Multiply-shift hashing ala Dietzfelbinger et al.: multiply 'hash' by eight different - // odd constants, then keep the 5 most significant bits from each product. - hash_data = _mm256_mullo_epi32(rehash, hash_data); - hash_data = _mm256_srli_epi32(hash_data, 27); - // Use these 5 bits to shift a single bit to a location in each 32-bit lane - return _mm256_sllv_epi32(ones, hash_data); -} - -inline void BloomFilter::BucketInsertAVX2( - const uint32_t bucket_idx, const uint32_t hash) { - const __m256i mask = MakeMask(hash); - __m256i* const bucket = &reinterpret_cast<__m256i*>(directory_)[bucket_idx]; - _mm256_store_si256(bucket, _mm256_or_si256(*bucket, mask)); - // For SSE compatibility, unset the high bits of each YMM register so SSE instructions - // dont have to save them off before using XMM registers. - _mm256_zeroupper(); -} - -inline bool BloomFilter::BucketFindAVX2( - const uint32_t bucket_idx, const uint32_t hash) const { - const __m256i mask = MakeMask(hash); - const __m256i bucket = reinterpret_cast<__m256i*>(directory_)[bucket_idx]; - // We should return true if 'bucket' has a one wherever 'mask' does. _mm256_testc_si256 - // takes the negation of its first argument and ands that with its second argument. In - // our case, the result is zero everywhere iff there is a one in 'bucket' wherever - // 'mask' is one. testc returns 1 if the result is 0 everywhere and returns 0 otherwise. - const bool result = _mm256_testc_si256(bucket, mask); - _mm256_zeroupper(); - return result; -} - -inline bool BloomFilter::BucketFind( - const uint32_t bucket_idx, const uint32_t hash) const { - for (int i = 0; i < BUCKET_WORDS; ++i) { - BucketWord hval = - (REHASH[i] * hash) >> ((1 << LOG_BUCKET_WORD_BITS) - LOG_BUCKET_WORD_BITS); - hval = 1U << hval; - if (!(directory_[bucket_idx][i] & hval)) { - return false; - } - } - return true; -} - } // namespace impala -#undef IMPALA_BLOOM_HASH_CONSTANTS #endif // IMPALA_UTIL_BLOOM_H
