Repository: impala Updated Branches: refs/heads/master 038af3459 -> 109028e89
Revert "IMPALA-7477: Batch-oriented query set construction" This has been implicated in some incorrect results involving nulls in HS2. This reverts commit b288a6af2eda9631b2bad91896ae4bfd2a3fdf30. Change-Id: I533c12f1b4cfc5b4a372ba834913975b5c52c5a8 Reviewed-on: http://gerrit.cloudera.org:8080/11462 Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com> Reviewed-by: Zoltan Borok-Nagy <borokna...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/109028e8 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/109028e8 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/109028e8 Branch: refs/heads/master Commit: 109028e89ce27880cc2ef69a7a5032c05df4d4df Parents: 038af34 Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Tue Sep 18 09:26:33 2018 -0700 Committer: Zoltan Borok-Nagy <borokna...@cloudera.com> Committed: Wed Sep 19 10:16:20 2018 +0000 ---------------------------------------------------------------------- be/src/exec/plan-root-sink.cc | 26 ++- be/src/exec/plan-root-sink.h | 4 + be/src/service/hs2-util.cc | 307 +++++++++----------------------- be/src/service/hs2-util.h | 15 +- be/src/service/query-result-set.cc | 115 +++++------- be/src/service/query-result-set.h | 14 +- 6 files changed, 161 insertions(+), 320 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/109028e8/be/src/exec/plan-root-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc index 1f5b2e5..a64dbb9 100644 --- a/be/src/exec/plan-root-sink.cc +++ b/be/src/exec/plan-root-sink.cc @@ -79,11 +79,22 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) { // Otherwise the consumer is ready. Fill out the rows. DCHECK(results_ != nullptr); + // List of expr values to hold evaluated rows from the query + vector<void*> result_row; + result_row.resize(output_exprs_.size()); + + // List of scales for floating point values in result_row + vector<int> scales; + scales.resize(result_row.size()); + int num_to_fetch = batch->num_rows() - current_batch_row; if (num_rows_requested_ > 0) num_to_fetch = min(num_to_fetch, num_rows_requested_); - RETURN_IF_ERROR( - results_->AddRows(output_expr_evals_, batch, current_batch_row, num_to_fetch)); - current_batch_row += num_to_fetch; + for (int i = 0; i < num_to_fetch; ++i) { + TupleRow* row = batch->GetRow(current_batch_row); + GetRowValue(row, &result_row, &scales); + RETURN_IF_ERROR(results_->AddOneRow(result_row, scales)); + ++current_batch_row; + } // Prevent expr result allocations from accumulating. expr_results_pool_->Clear(); // Signal the consumer. @@ -135,4 +146,13 @@ Status PlanRootSink::GetNext( *eos = sender_state_ == SenderState::EOS; return state->GetQueryStatus(); } + +void PlanRootSink::GetRowValue( + TupleRow* row, vector<void*>* result, vector<int>* scales) { + DCHECK_GE(result->size(), output_expr_evals_.size()); + for (int i = 0; i < output_expr_evals_.size(); ++i) { + (*result)[i] = output_expr_evals_[i]->GetValue(row); + (*scales)[i] = output_expr_evals_[i]->output_scale(); + } +} } http://git-wip-us.apache.org/repos/asf/impala/blob/109028e8/be/src/exec/plan-root-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h index 300c993..1d64b21 100644 --- a/be/src/exec/plan-root-sink.h +++ b/be/src/exec/plan-root-sink.h @@ -118,6 +118,10 @@ class PlanRootSink : public DataSink { /// Set by GetNext() to indicate to Send() how many rows it should write to results_. int num_rows_requested_ = 0; + + /// Writes a single row into 'result' and 'scales' by evaluating + /// output_expr_evals_ over 'row'. + void GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales); }; } http://git-wip-us.apache.org/repos/asf/impala/blob/109028e8/be/src/service/hs2-util.cc ---------------------------------------------------------------------- diff --git a/be/src/service/hs2-util.cc b/be/src/service/hs2-util.cc index cf571fb..b856556 100644 --- a/be/src/service/hs2-util.cc +++ b/be/src/service/hs2-util.cc @@ -18,12 +18,9 @@ #include "service/hs2-util.h" #include "common/logging.h" -#include "exprs/scalar-expr-evaluator.h" #include "runtime/decimal-value.inline.h" #include "runtime/raw-value.inline.h" -#include "runtime/row-batch.h" #include "runtime/types.h" -#include "util/bit-util.h" #include <gutil/strings/substitute.h> @@ -52,9 +49,7 @@ inline bool GetNullBit(const string& nulls, uint32_t row_idx) { void impala::StitchNulls(uint32_t num_rows_before, uint32_t num_rows_added, uint32_t start_idx, const string& from, string* to) { - // Round up to power-of-two to avoid accidentally quadratic behaviour from repeated - // small increases in size. - to->reserve(BitUtil::RoundUpToPowerOfTwo((num_rows_before + num_rows_added + 7) / 8)); + to->reserve((num_rows_before + num_rows_added + 7) / 8); // TODO: This is very inefficient, since we could conceivably go one byte at a time // (although the operands should stay live in registers in the loop). However doing this @@ -123,246 +118,106 @@ void impala::TColumnValueToHS2TColumn(const TColumnValue& col_val, SetNullBit(row_idx, is_null, nulls); } -// Specialised per-type implementations of ExprValuesToHS2TColumn. - -// Helper to reserve space in hs2Vals->values and hs2Vals->nulls for the values that the -// different implementations of ExprValuesToHS2TColumn will write. -template <typename T> -void ReserveSpace(int start_idx, int num_rows, uint32_t output_row_idx, T* hs2Vals) { - int64_t num_output_rows = output_row_idx + num_rows - start_idx; - int64_t num_null_bytes = BitUtil::RoundUpNumBytes(num_output_rows); - // Round up reserve() arguments to power-of-two to avoid accidentally quadratic - // behaviour from repeated small increases in size. - hs2Vals->values.reserve(BitUtil::RoundUpToPowerOfTwo(num_output_rows)); - hs2Vals->nulls.reserve(BitUtil::RoundUpToPowerOfTwo(num_null_bytes)); -} - -// Implementation for BOOL. -static void BoolExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, - int start_idx, int num_rows, uint32_t output_row_idx, - apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(start_idx, num_rows, output_row_idx, &column->boolVal); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - BooleanVal val = expr_eval->GetBooleanVal(it.Get()); - column->boolVal.values.push_back(val.val); - SetNullBit(output_row_idx, val.is_null, &column->boolVal.nulls); - } -} - -// Implementation for TINYINT. -static void TinyIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, - int start_idx, int num_rows, uint32_t output_row_idx, - apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(start_idx, num_rows, output_row_idx, &column->byteVal); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - TinyIntVal val = expr_eval->GetTinyIntVal(it.Get()); - column->byteVal.values.push_back(val.val); - SetNullBit(output_row_idx, val.is_null, &column->byteVal.nulls); - } -} - -// Implementation for SMALLINT. -static void SmallIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, - RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, - apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(start_idx, num_rows, output_row_idx, &column->i16Val); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - SmallIntVal val = expr_eval->GetSmallIntVal(it.Get()); - column->i16Val.values.push_back(val.val); - SetNullBit(output_row_idx, val.is_null, &column->i16Val.nulls); - } -} - -// Implementation for INT. -static void IntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, - int start_idx, int num_rows, uint32_t output_row_idx, - apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(start_idx, num_rows, output_row_idx, &column->i32Val); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - IntVal val = expr_eval->GetIntVal(it.Get()); - column->i32Val.values.push_back(val.val); - SetNullBit(output_row_idx, val.is_null, &column->i32Val.nulls); - } -} - -// Implementation for BIGINT. -static void BigIntExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, - int start_idx, int num_rows, uint32_t output_row_idx, - apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(start_idx, num_rows, output_row_idx, &column->i64Val); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - BigIntVal val = expr_eval->GetBigIntVal(it.Get()); - column->i64Val.values.push_back(val.val); - SetNullBit(output_row_idx, val.is_null, &column->i64Val.nulls); - } -} - -// Implementation for FLOAT. -static void FloatExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, - int start_idx, int num_rows, uint32_t output_row_idx, - apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(start_idx, num_rows, output_row_idx, &column->doubleVal); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - FloatVal val = expr_eval->GetFloatVal(it.Get()); - column->doubleVal.values.push_back(val.val); - SetNullBit(output_row_idx, val.is_null, &column->doubleVal.nulls); - } -} - -// Implementation for DOUBLE. -static void DoubleExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, - int start_idx, int num_rows, uint32_t output_row_idx, - apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(start_idx, num_rows, output_row_idx, &column->doubleVal); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - DoubleVal val = expr_eval->GetDoubleVal(it.Get()); - column->doubleVal.values.push_back(val.val); - SetNullBit(output_row_idx, val.is_null, &column->doubleVal.nulls); - } -} - -// Implementation for TIMESTAMP. -static void TimestampExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, - RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, - apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - TimestampVal val = expr_eval->GetTimestampVal(it.Get()); - column->stringVal.values.emplace_back(); - if (!val.is_null) { - TimestampValue value = TimestampValue::FromTimestampVal(val); - RawValue::PrintValue( - &value, TYPE_TIMESTAMP, -1, &(column->stringVal.values.back())); - } - SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls); - } -} - -// Implementation for STRING and VARCHAR. -static void StringExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, RowBatch* batch, - int start_idx, int num_rows, uint32_t output_row_idx, - apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - StringVal val = expr_eval->GetStringVal(it.Get()); - if (val.is_null) { - column->stringVal.values.emplace_back(); - } else { - column->stringVal.values.emplace_back(reinterpret_cast<char*>(val.ptr), val.len); - } - SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls); - } -} - -// Implementation for CHAR. -static void CharExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, - const TColumnType& type, RowBatch* batch, int start_idx, int num_rows, - uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal); - ColumnType char_type = ColumnType::CreateCharType(type.types[0].scalar_type.len); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - StringVal val = expr_eval->GetStringVal(it.Get()); - if (val.is_null) { - column->stringVal.values.emplace_back(); - } else { - column->stringVal.values.emplace_back( - reinterpret_cast<const char*>(val.ptr), char_type.len); - } - SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls); - } -} - -static void DecimalExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, - const TColumnType& type, RowBatch* batch, int start_idx, int num_rows, - uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - ReserveSpace(start_idx, num_rows, output_row_idx, &column->stringVal); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - DecimalVal val = expr_eval->GetDecimalVal(it.Get()); - const ColumnType& decimalType = ColumnType::FromThrift(type); - if (val.is_null) { - column->stringVal.values.emplace_back(); - } else { - switch (decimalType.GetByteSize()) { - case 4: - column->stringVal.values.emplace_back( - Decimal4Value(val.val4).ToString(decimalType)); - break; - case 8: - column->stringVal.values.emplace_back( - Decimal8Value(val.val8).ToString(decimalType)); - break; - case 16: - column->stringVal.values.emplace_back( - Decimal16Value(val.val16).ToString(decimalType)); - break; - default: - DCHECK(false) << "bad type: " << decimalType; - } - } - SetNullBit(output_row_idx, val.is_null, &column->stringVal.nulls); - } -} - // For V6 and above -void impala::ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, - const TColumnType& type, RowBatch* batch, int start_idx, int num_rows, - uint32_t output_row_idx, apache::hive::service::cli::thrift::TColumn* column) { - // Dispatch to a templated function for the loop over rows. This avoids branching on - // the type for every row. - // TODO: instead of relying on stamped out implementations, we could codegen this loop - // to inline the expression evaluation into the loop body. +void impala::ExprValueToHS2TColumn(const void* value, const TColumnType& type, + uint32_t row_idx, thrift::TColumn* column) { + string* nulls; switch (type.types[0].scalar_type.type) { case TPrimitiveType::NULL_TYPE: case TPrimitiveType::BOOLEAN: - BoolExprValuesToHS2TColumn( - expr_eval, batch, start_idx, num_rows, output_row_idx, column); - return; + column->boolVal.values.push_back( + value == NULL ? false : *reinterpret_cast<const bool*>(value)); + nulls = &column->boolVal.nulls; + break; case TPrimitiveType::TINYINT: - TinyIntExprValuesToHS2TColumn( - expr_eval, batch, start_idx, num_rows, output_row_idx, column); - return; + column->byteVal.values.push_back( + value == NULL ? 0 : *reinterpret_cast<const int8_t*>(value)); + nulls = &column->byteVal.nulls; + break; case TPrimitiveType::SMALLINT: - SmallIntExprValuesToHS2TColumn( - expr_eval, batch, start_idx, num_rows, output_row_idx, column); - return; + column->i16Val.values.push_back( + value == NULL ? 0 : *reinterpret_cast<const int16_t*>(value)); + nulls = &column->i16Val.nulls; + break; case TPrimitiveType::INT: - IntExprValuesToHS2TColumn( - expr_eval, batch, start_idx, num_rows, output_row_idx, column); - return; + column->i32Val.values.push_back( + value == NULL ? 0 : *reinterpret_cast<const int32_t*>(value)); + nulls = &column->i32Val.nulls; + break; case TPrimitiveType::BIGINT: - BigIntExprValuesToHS2TColumn( - expr_eval, batch, start_idx, num_rows, output_row_idx, column); - return; + column->i64Val.values.push_back( + value == NULL ? 0 : *reinterpret_cast<const int64_t*>(value)); + nulls = &column->i64Val.nulls; + break; case TPrimitiveType::FLOAT: - FloatExprValuesToHS2TColumn( - expr_eval, batch, start_idx, num_rows, output_row_idx, column); - return; + column->doubleVal.values.push_back( + value == NULL ? 0.f : *reinterpret_cast<const float*>(value)); + nulls = &column->doubleVal.nulls; + break; case TPrimitiveType::DOUBLE: - DoubleExprValuesToHS2TColumn( - expr_eval, batch, start_idx, num_rows, output_row_idx, column); - return; + column->doubleVal.values.push_back( + value == NULL ? 0.0 : *reinterpret_cast<const double*>(value)); + nulls = &column->doubleVal.nulls; + break; case TPrimitiveType::TIMESTAMP: - TimestampExprValuesToHS2TColumn( - expr_eval, batch, start_idx, num_rows, output_row_idx, column); - return; + column->stringVal.values.push_back(""); + if (value != NULL) { + RawValue::PrintValue(value, TYPE_TIMESTAMP, -1, + &(column->stringVal.values.back())); + } + nulls = &column->stringVal.nulls; + break; case TPrimitiveType::STRING: case TPrimitiveType::VARCHAR: - StringExprValuesToHS2TColumn( - expr_eval, batch, start_idx, num_rows, output_row_idx, column); - return; + column->stringVal.values.push_back(""); + if (value != NULL) { + const StringValue* str_val = reinterpret_cast<const StringValue*>(value); + column->stringVal.values.back().assign( + static_cast<char*>(str_val->ptr), str_val->len); + } + nulls = &column->stringVal.nulls; + break; case TPrimitiveType::CHAR: - CharExprValuesToHS2TColumn( - expr_eval, type, batch, start_idx, num_rows, output_row_idx, column); - return; + column->stringVal.values.push_back(""); + if (value != NULL) { + ColumnType char_type = ColumnType::CreateCharType(type.types[0].scalar_type.len); + column->stringVal.values.back().assign( + reinterpret_cast<const char*>(value), char_type.len); + } + nulls = &column->stringVal.nulls; + break; case TPrimitiveType::DECIMAL: { - DecimalExprValuesToHS2TColumn( - expr_eval, type, batch, start_idx, num_rows, output_row_idx, column); - return; + // HiveServer2 requires decimal to be presented as string. + column->stringVal.values.push_back(""); + const ColumnType& decimalType = ColumnType::FromThrift(type); + if (value != NULL) { + switch (decimalType.GetByteSize()) { + case 4: + column->stringVal.values.back() = + reinterpret_cast<const Decimal4Value*>(value)->ToString(decimalType); + break; + case 8: + column->stringVal.values.back() = + reinterpret_cast<const Decimal8Value*>(value)->ToString(decimalType); + break; + case 16: + column->stringVal.values.back() = + reinterpret_cast<const Decimal16Value*>(value)->ToString(decimalType); + break; + default: + DCHECK(false) << "bad type: " << decimalType; + } + } + nulls = &column->stringVal.nulls; + break; } default: DCHECK(false) << "Unhandled type: " << TypeToString(ThriftToType(type.types[0].scalar_type.type)); + return; } + + SetNullBit(row_idx, (value == NULL), nulls); } // For V1 -> V5 http://git-wip-us.apache.org/repos/asf/impala/blob/109028e8/be/src/service/hs2-util.h ---------------------------------------------------------------------- diff --git a/be/src/service/hs2-util.h b/be/src/service/hs2-util.h index 4f0f973..44ceba6 100644 --- a/be/src/service/hs2-util.h +++ b/be/src/service/hs2-util.h @@ -20,23 +20,16 @@ namespace impala { -class RowBatch; -class ScalarExprEvaluator; - -/// Utility methods for converting from Impala (either an Expr result or a TColumnValue) -/// to Hive types (either a thrift::TColumnValue (V1->V5) or a TColumn (V6->). +/// Utility methods for converting from Impala (either an Expr result or a TColumnValue) to +/// Hive types (either a thrift::TColumnValue (V1->V5) or a TColumn (V6->). /// For V6-> void TColumnValueToHS2TColumn(const TColumnValue& col_val, const TColumnType& type, uint32_t row_idx, apache::hive::service::cli::thrift::TColumn* column); -/// Evaluate 'expr_eval' over the row [start_idx, start_idx + num_rows) from 'batch' into -/// 'column' with 'type' starting at output_row_idx. The caller is responsible for -/// calling RuntimeState::GetQueryStatus() to check for expression evaluation errors. /// For V6-> -void ExprValuesToHS2TColumn(ScalarExprEvaluator* expr_eval, const TColumnType& type, - RowBatch* batch, int start_idx, int num_rows, uint32_t output_row_idx, - apache::hive::service::cli::thrift::TColumn* column); +void ExprValueToHS2TColumn(const void* value, const TColumnType& type, + uint32_t row_idx, apache::hive::service::cli::thrift::TColumn* column); /// For V1->V5 void TColumnValueToHS2TColumnValue(const TColumnValue& col_val, const TColumnType& type, http://git-wip-us.apache.org/repos/asf/impala/blob/109028e8/be/src/service/query-result-set.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc index f254176..aacd849 100644 --- a/be/src/service/query-result-set.cc +++ b/be/src/service/query-result-set.cc @@ -20,13 +20,10 @@ #include <sstream> #include <boost/scoped_ptr.hpp> -#include "exprs/scalar-expr-evaluator.h" #include "rpc/thrift-util.h" #include "runtime/raw-value.h" -#include "runtime/row-batch.h" #include "runtime/types.h" #include "service/hs2-util.h" -#include "util/bit-util.h" #include "common/names.h" @@ -54,19 +51,18 @@ class AsciiQueryResultSet : public QueryResultSet { virtual ~AsciiQueryResultSet() {} - /// Evaluate 'expr_evals' over rows in 'batch', convert to ASCII using "\t" as column + /// Convert one row's expr values stored in 'col_values' to ASCII using "\t" as column /// delimiter and store it in this result set. /// TODO: Handle complex types. - virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch, - int start_idx, int num_rows) override; + virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales); /// Convert TResultRow to ASCII using "\t" as column delimiter and store it in this /// result set. - virtual Status AddOneRow(const TResultRow& row) override; + virtual Status AddOneRow(const TResultRow& row); - virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override; - virtual int64_t ByteSize(int start_idx, int num_rows) override; - virtual size_t size() override { return result_set_->size(); } + virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows); + virtual int64_t ByteSize(int start_idx, int num_rows); + virtual size_t size() { return result_set_->size(); } private: /// Metadata of the result set @@ -84,20 +80,18 @@ class HS2ColumnarResultSet : public QueryResultSet { virtual ~HS2ColumnarResultSet() {} - /// Evaluate 'expr_evals' over rows in 'batch' and convert to the HS2 columnar - /// representation. - virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch, - int start_idx, int num_rows) override; + /// Add a row of expr values + virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales); /// Add a row from a TResultRow - virtual Status AddOneRow(const TResultRow& row) override; + virtual Status AddOneRow(const TResultRow& row); /// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows' /// from 'other' into this result set - virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override; + virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows); - virtual int64_t ByteSize(int start_idx, int num_rows) override; - virtual size_t size() override { return num_rows_; } + virtual int64_t ByteSize(int start_idx, int num_rows); + virtual size_t size() { return num_rows_; } private: /// Metadata of the result set @@ -125,17 +119,15 @@ class HS2RowOrientedResultSet : public QueryResultSet { virtual ~HS2RowOrientedResultSet() {} - /// Evaluate 'expr_evals' over rows in 'batch' and convert to the HS2 row-oriented - /// representation of TRows stored in a TRowSet. - virtual Status AddRows(const vector<ScalarExprEvaluator*>& expr_evals, RowBatch* batch, - int start_idx, int num_rows) override; + /// Convert expr values to HS2 TRow and store it in a TRowSet. + virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales); /// Convert TResultRow to HS2 TRow and store it in a TRowSet - virtual Status AddOneRow(const TResultRow& row) override; + virtual Status AddOneRow(const TResultRow& row); - virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) override; - virtual int64_t ByteSize(int start_idx, int num_rows) override; - virtual size_t size() override { return result_set_->rows.size(); } + virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows); + virtual int64_t ByteSize(int start_idx, int num_rows); + virtual size_t size() { return result_set_->rows.size(); } private: /// Metadata of the result set @@ -166,34 +158,20 @@ QueryResultSet* QueryResultSet::CreateHS2ResultSet( ////////////////////////////////////////////////////////////////////////////////////////// -Status AsciiQueryResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals, - RowBatch* batch, int start_idx, int num_rows) { - DCHECK_GE(batch->num_rows(), start_idx + num_rows); - int num_col = expr_evals.size(); +Status AsciiQueryResultSet::AddOneRow( + const vector<void*>& col_values, const vector<int>& scales) { + int num_col = col_values.size(); DCHECK_EQ(num_col, metadata_.columns.size()); - vector<int> scales; - scales.reserve(num_col); - for (ScalarExprEvaluator* expr_eval : expr_evals) { - scales.push_back(expr_eval->output_scale()); - } - // Round up to power-of-two to avoid accidentally quadratic behaviour from repeated - // small increases in size. - result_set_->reserve( - BitUtil::RoundUpToPowerOfTwo(result_set_->size() + num_rows - start_idx)); stringstream out_stream; out_stream.precision(ASCII_PRECISION); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - for (int i = 0; i < num_col; ++i) { - // ODBC-187 - ODBC can only take "\t" as the delimiter - out_stream << (i > 0 ? "\t" : ""); - DCHECK_EQ(1, metadata_.columns[i].columnType.types.size()); - RawValue::PrintValue(expr_evals[i]->GetValue(it.Get()), - ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i], - &out_stream); - } - result_set_->push_back(out_stream.str()); - out_stream.str(""); + for (int i = 0; i < num_col; ++i) { + // ODBC-187 - ODBC can only take "\t" as the delimiter + out_stream << (i > 0 ? "\t" : ""); + DCHECK_EQ(1, metadata_.columns[i].columnType.types.size()); + RawValue::PrintValue(col_values[i], + ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i], &out_stream); } + result_set_->push_back(out_stream.str()); return Status::OK(); } @@ -285,18 +263,16 @@ HS2ColumnarResultSet::HS2ColumnarResultSet( InitColumns(); } -Status HS2ColumnarResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals, - RowBatch* batch, int start_idx, int num_rows) { - DCHECK_GE(batch->num_rows(), start_idx + num_rows); - int num_col = expr_evals.size(); +// Add a row of expr values +Status HS2ColumnarResultSet::AddOneRow( + const vector<void*>& col_values, const vector<int>& scales) { + int num_col = col_values.size(); DCHECK_EQ(num_col, metadata_.columns.size()); for (int i = 0; i < num_col; ++i) { - const TColumnType& type = metadata_.columns[i].columnType; - ScalarExprEvaluator* expr_eval = expr_evals[i]; - ExprValuesToHS2TColumn(expr_eval, type, batch, start_idx, num_rows, num_rows_, + ExprValueToHS2TColumn(col_values[i], metadata_.columns[i].columnType, num_rows_, &(result_set_->columns[i])); } - num_rows_ += num_rows; + ++num_rows_; return Status::OK(); } @@ -451,21 +427,16 @@ HS2RowOrientedResultSet::HS2RowOrientedResultSet( } } -Status HS2RowOrientedResultSet::AddRows(const vector<ScalarExprEvaluator*>& expr_evals, - RowBatch* batch, int start_idx, int num_rows) { - DCHECK_GE(batch->num_rows(), start_idx + num_rows); - int num_col = expr_evals.size(); +Status HS2RowOrientedResultSet::AddOneRow( + const vector<void*>& col_values, const vector<int>& scales) { + int num_col = col_values.size(); DCHECK_EQ(num_col, metadata_.columns.size()); - result_set_->rows.reserve( - BitUtil::RoundUpToPowerOfTwo(result_set_->rows.size() + num_rows - start_idx)); - FOREACH_ROW_LIMIT(batch, start_idx, num_rows, it) { - result_set_->rows.push_back(TRow()); - TRow& trow = result_set_->rows.back(); - trow.colVals.resize(num_col); - for (int i = 0; i < num_col; ++i) { - ExprValueToHS2TColumnValue(expr_evals[i]->GetValue(it.Get()), - metadata_.columns[i].columnType, &(trow.colVals[i])); - } + result_set_->rows.push_back(TRow()); + TRow& trow = result_set_->rows.back(); + trow.colVals.resize(num_col); + for (int i = 0; i < num_col; ++i) { + ExprValueToHS2TColumnValue( + col_values[i], metadata_.columns[i].columnType, &(trow.colVals[i])); } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/impala/blob/109028e8/be/src/service/query-result-set.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-result-set.h b/be/src/service/query-result-set.h index fa39d73..e0c88d7 100644 --- a/be/src/service/query-result-set.h +++ b/be/src/service/query-result-set.h @@ -27,9 +27,6 @@ namespace impala { -class RowBatch; -class ScalarExprEvaluator; - /// Wraps a client-API specific result representation, and implements the logic required /// to translate into that format from Impala's row format. /// @@ -39,11 +36,12 @@ class QueryResultSet { QueryResultSet() {} virtual ~QueryResultSet() {} - /// Add 'num_rows' rows to the result set, obtained by evaluating 'expr_evals' over - /// the rows in 'batch' starting at start_idx. Batch must contain at least - /// ('start_idx' + 'num_rows') rows. - virtual Status AddRows(const std::vector<ScalarExprEvaluator*>& expr_evals, - RowBatch* batch, int start_idx, int num_rows) = 0; + /// Add a single row to this result set. The row is a vector of pointers to values, + /// whose memory belongs to the caller. 'scales' contains the scales for decimal values + /// (# of digits after decimal), with -1 indicating no scale specified or the + /// corresponding value is not a decimal. + virtual Status AddOneRow( + const std::vector<void*>& row, const std::vector<int>& scales) = 0; /// Add the TResultRow to this result set. When a row comes from a DDL/metadata /// operation, the row in the form of TResultRow.