IMPALA-2905: Move QueryResultSet implementations into separate module This mostly mechanical change moves the definition and implementation of the Beeswax and HS2-specific result sets into their own module. Result sets are now uniformly created by one of two factory methods, so the implementation is decoupled from the client.
Change-Id: I6ab883b62d3ec7012240edf8d56889349e7c0e32 Reviewed-on: http://gerrit.cloudera.org:8080/4736 Reviewed-by: Henry Robinson <he...@cloudera.com> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3f5380dc Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3f5380dc Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3f5380dc Branch: refs/heads/hadoop-next Commit: 3f5380dc73f3ab907443a2858d4fe0de6e3685e7 Parents: 080a678 Author: Henry Robinson <he...@cloudera.com> Authored: Sat Oct 15 16:47:24 2016 -0700 Committer: Internal Jenkins <cloudera-hud...@gerrit.cloudera.org> Committed: Tue Oct 18 09:30:09 2016 +0000 ---------------------------------------------------------------------- be/src/service/CMakeLists.txt | 1 + be/src/service/impala-beeswax-server.cc | 98 +----- be/src/service/impala-hs2-server.cc | 324 +----------------- be/src/service/query-result-set.cc | 478 +++++++++++++++++++++++++++ be/src/service/query-result-set.h | 21 +- 5 files changed, 503 insertions(+), 419 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index aa12ceb..35130ff 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -33,6 +33,7 @@ add_library(Service impala-beeswax-server.cc query-exec-state.cc query-options.cc + query-result-set.cc child-query.cc impalad-main.cc ) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/impala-beeswax-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index ee7f958..b50499e 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -47,100 +47,8 @@ using namespace beeswax; } \ } while (false) -namespace { - -/// Ascii output precision for double/float -constexpr int ASCII_PRECISION = 16; -} - namespace impala { -// Ascii result set for Beeswax. -// Beeswax returns rows in ascii, using "\t" as column delimiter. -class AsciiQueryResultSet : public QueryResultSet { - public: - // Rows are added into rowset. - AsciiQueryResultSet(const TResultSetMetadata& metadata, vector<string>* rowset) - : metadata_(metadata), result_set_(rowset), owned_result_set_(NULL) { - } - - // Rows are added into a new rowset that is owned by this result set. - AsciiQueryResultSet(const TResultSetMetadata& metadata) - : metadata_(metadata), result_set_(new vector<string>()), - owned_result_set_(result_set_) { - } - - virtual ~AsciiQueryResultSet() { } - - // Convert expr values (col_values) to ASCII using "\t" as column delimiter and store - // it in this result set. - // TODO: Handle complex types. - virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) { - int num_col = col_values.size(); - DCHECK_EQ(num_col, metadata_.columns.size()); - stringstream out_stream; - out_stream.precision(ASCII_PRECISION); - for (int i = 0; i < num_col; ++i) { - // ODBC-187 - ODBC can only take "\t" as the delimiter - out_stream << (i > 0 ? "\t" : ""); - DCHECK_EQ(1, metadata_.columns[i].columnType.types.size()); - RawValue::PrintValue(col_values[i], - ColumnType::FromThrift(metadata_.columns[i].columnType), - scales[i], &out_stream); - } - result_set_->push_back(out_stream.str()); - return Status::OK(); - } - - // Convert TResultRow to ASCII using "\t" as column delimiter and store it in this - // result set. - virtual Status AddOneRow(const TResultRow& row) { - int num_col = row.colVals.size(); - DCHECK_EQ(num_col, metadata_.columns.size()); - stringstream out_stream; - out_stream.precision(ASCII_PRECISION); - for (int i = 0; i < num_col; ++i) { - // ODBC-187 - ODBC can only take "\t" as the delimiter - out_stream << (i > 0 ? "\t" : ""); - out_stream << row.colVals[i]; - } - result_set_->push_back(out_stream.str()); - return Status::OK(); - } - - virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) { - const AsciiQueryResultSet* o = static_cast<const AsciiQueryResultSet*>(other); - if (start_idx >= o->result_set_->size()) return 0; - const int rows_added = - min(static_cast<size_t>(num_rows), o->result_set_->size() - start_idx); - result_set_->insert(result_set_->end(), o->result_set_->begin() + start_idx, - o->result_set_->begin() + start_idx + rows_added); - return rows_added; - } - - virtual int64_t ByteSize(int start_idx, int num_rows) { - int64_t bytes = 0; - const int end = min(static_cast<size_t>(num_rows), result_set_->size() - start_idx); - for (int i = start_idx; i < start_idx + end; ++i) { - bytes += sizeof(result_set_[i]) + result_set_[i].capacity(); - } - return bytes; - } - - virtual size_t size() { return result_set_->size(); } - - private: - // Metadata of the result set - const TResultSetMetadata& metadata_; - - // Points to the result set to be filled. The result set this points to may be owned by - // this object, in which case owned_result_set_ is set. - vector<string>* result_set_; - - // Set to result_set_ if result_set_ is owned. - scoped_ptr<vector<string>> owned_result_set_; -}; - void ImpalaServer::query(QueryHandle& query_handle, const Query& query) { VLOG_QUERY << "query(): query=" << query.query; ScopedSessionState session_handle(this); @@ -588,9 +496,9 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id, Status fetch_rows_status; query_results->data.clear(); if (!exec_state->eos()) { - AsciiQueryResultSet result_set(*(exec_state->result_metadata()), - &(query_results->data)); - fetch_rows_status = exec_state->FetchRows(fetch_size, &result_set); + scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateAsciiQueryResultSet( + *exec_state->result_metadata(), &query_results->data)); + fetch_rows_status = exec_state->FetchRows(fetch_size, result_set.get()); } query_results->__set_has_more(!exec_state->eos()); query_results->__isset.data = true; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/impala-hs2-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc index de0e2f3..488a1ee 100644 --- a/be/src/service/impala-hs2-server.cc +++ b/be/src/service/impala-hs2-server.cc @@ -85,315 +85,10 @@ namespace impala { const string IMPALA_RESULT_CACHING_OPT = "impala.resultset.cache.size"; -// Utility functions for computing the size of HS2 Thrift structs in bytes. -static inline -int64_t ByteSize(const thrift::TColumnValue& val) { - return sizeof(val) + val.stringVal.value.capacity(); -} - -static int64_t ByteSize(const thrift::TRow& row) { - int64_t bytes = sizeof(row); - for (const thrift::TColumnValue& c: row.colVals) { - bytes += ByteSize(c); - } - return bytes; -} - -// Returns the size, in bytes, of a Hive TColumn structure, only taking into account those -// values in the range [start_idx, end_idx). -static uint32_t TColumnByteSize(const thrift::TColumn& col, uint32_t start_idx, - uint32_t end_idx) { - DCHECK_LE(start_idx, end_idx); - uint32_t num_rows = end_idx - start_idx; - if (num_rows == 0) return 0L; - - if (col.__isset.boolVal) return (num_rows * sizeof(bool)) + col.boolVal.nulls.size(); - if (col.__isset.byteVal) return num_rows + col.byteVal.nulls.size(); - if (col.__isset.i16Val) return (num_rows * sizeof(int16_t)) + col.i16Val.nulls.size(); - if (col.__isset.i32Val) return (num_rows * sizeof(int32_t)) + col.i32Val.nulls.size(); - if (col.__isset.i64Val) return (num_rows * sizeof(int64_t)) + col.i64Val.nulls.size(); - if (col.__isset.doubleVal) { - return (num_rows * sizeof(double)) + col.doubleVal.nulls.size(); - } - if (col.__isset.stringVal) { - uint32_t bytes = 0; - for (int i = start_idx; i < end_idx; ++i) bytes += col.stringVal.values[i].size(); - return bytes + col.stringVal.nulls.size(); - } - - return 0; -} - // Helper function to translate between Beeswax and HiveServer2 type static TOperationState::type QueryStateToTOperationState( const beeswax::QueryState::type& query_state); -// Result set container for Hive protocol versions >= V6, where results are returned in -// column-orientation. -class HS2ColumnarResultSet : public QueryResultSet { - public: - HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL) - : metadata_(metadata), result_set_(rowset), num_rows_(0) { - if (rowset == NULL) { - owned_result_set_.reset(new TRowSet()); - result_set_ = owned_result_set_.get(); - } - InitColumns(); - } - - virtual ~HS2ColumnarResultSet() { } - - // Add a row of expr values - virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) { - int num_col = col_values.size(); - DCHECK_EQ(num_col, metadata_.columns.size()); - for (int i = 0; i < num_col; ++i) { - ExprValueToHS2TColumn(col_values[i], metadata_.columns[i].columnType, num_rows_, - &(result_set_->columns[i])); - } - ++num_rows_; - return Status::OK(); - } - - // Add a row from a TResultRow - virtual Status AddOneRow(const TResultRow& row) { - int num_col = row.colVals.size(); - DCHECK_EQ(num_col, metadata_.columns.size()); - for (int i = 0; i < num_col; ++i) { - TColumnValueToHS2TColumn(row.colVals[i], metadata_.columns[i].columnType, num_rows_, - &(result_set_->columns[i])); - } - ++num_rows_; - return Status::OK(); - } - - // Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows' - // from 'other' into this result set - virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) { - const HS2ColumnarResultSet* o = static_cast<const HS2ColumnarResultSet*>(other); - DCHECK_EQ(metadata_.columns.size(), o->metadata_.columns.size()); - if (start_idx >= o->num_rows_) return 0; - const int rows_added = min<int64_t>(num_rows, o->num_rows_ - start_idx); - for (int j = 0; j < metadata_.columns.size(); ++j) { - thrift::TColumn* from = &o->result_set_->columns[j]; - thrift::TColumn* to = &result_set_->columns[j]; - switch (metadata_.columns[j].columnType.types[0].scalar_type.type) { - case TPrimitiveType::NULL_TYPE: - case TPrimitiveType::BOOLEAN: - StitchNulls(num_rows_, rows_added, start_idx, from->boolVal.nulls, - &(to->boolVal.nulls)); - to->boolVal.values.insert( - to->boolVal.values.end(), - from->boolVal.values.begin() + start_idx, - from->boolVal.values.begin() + start_idx + rows_added); - break; - case TPrimitiveType::TINYINT: - StitchNulls(num_rows_, rows_added, start_idx, from->byteVal.nulls, - &(to->byteVal.nulls)); - to->byteVal.values.insert( - to->byteVal.values.end(), - from->byteVal.values.begin() + start_idx, - from->byteVal.values.begin() + start_idx + rows_added); - break; - case TPrimitiveType::SMALLINT: - StitchNulls(num_rows_, rows_added, start_idx, from->i16Val.nulls, - &(to->i16Val.nulls)); - to->i16Val.values.insert( - to->i16Val.values.end(), - from->i16Val.values.begin() + start_idx, - from->i16Val.values.begin() + start_idx + rows_added); - break; - case TPrimitiveType::INT: - StitchNulls(num_rows_, rows_added, start_idx, from->i32Val.nulls, - &(to->i32Val.nulls)); - to->i32Val.values.insert( - to->i32Val.values.end(), - from->i32Val.values.begin() + start_idx, - from->i32Val.values.begin() + start_idx + rows_added); - break; - case TPrimitiveType::BIGINT: - StitchNulls(num_rows_, rows_added, start_idx, from->i64Val.nulls, - &(to->i64Val.nulls)); - to->i64Val.values.insert( - to->i64Val.values.end(), - from->i64Val.values.begin() + start_idx, - from->i64Val.values.begin() + start_idx + rows_added); - break; - case TPrimitiveType::FLOAT: - case TPrimitiveType::DOUBLE: - StitchNulls(num_rows_, rows_added, start_idx, from->doubleVal.nulls, - &(to->doubleVal.nulls)); - to->doubleVal.values.insert( - to->doubleVal.values.end(), - from->doubleVal.values.begin() + start_idx, - from->doubleVal.values.begin() + start_idx + rows_added); - break; - case TPrimitiveType::TIMESTAMP: - case TPrimitiveType::DECIMAL: - case TPrimitiveType::STRING: - case TPrimitiveType::VARCHAR: - case TPrimitiveType::CHAR: - StitchNulls(num_rows_, rows_added, start_idx, from->stringVal.nulls, - &(to->stringVal.nulls)); - to->stringVal.values.insert(to->stringVal.values.end(), - from->stringVal.values.begin() + start_idx, - from->stringVal.values.begin() + start_idx + rows_added); - break; - default: - DCHECK(false) << "Unsupported type: " << TypeToString(ThriftToType( - metadata_.columns[j].columnType.types[0].scalar_type.type)); - break; - } - } - num_rows_ += rows_added; - return rows_added; - } - - virtual int64_t ByteSize(int start_idx, int num_rows) { - const int end = min(start_idx + num_rows, (int)size()); - int64_t bytes = 0L; - for (const thrift::TColumn& c: result_set_->columns) { - bytes += TColumnByteSize(c, start_idx, end); - } - return bytes; - } - - virtual size_t size() { return num_rows_; } - - private: - // Metadata of the result set - const TResultSetMetadata& metadata_; - - // Points to the TRowSet to be filled. The row set this points to may be owned by - // this object, in which case owned_result_set_ is set. - TRowSet* result_set_; - - // Set to result_set_ if result_set_ is owned. - scoped_ptr<TRowSet> owned_result_set_; - - int64_t num_rows_; - - void InitColumns() { - result_set_->__isset.columns = true; - for (const TColumn& col: metadata_.columns) { - DCHECK(col.columnType.types.size() == 1) << - "Structured columns unsupported in HS2 interface"; - thrift::TColumn column; - switch (col.columnType.types[0].scalar_type.type) { - case TPrimitiveType::NULL_TYPE: - case TPrimitiveType::BOOLEAN: - column.__isset.boolVal = true; - break; - case TPrimitiveType::TINYINT: - column.__isset.byteVal = true; - break; - case TPrimitiveType::SMALLINT: - column.__isset.i16Val = true; - break; - case TPrimitiveType::INT: - column.__isset.i32Val = true; - break; - case TPrimitiveType::BIGINT: - column.__isset.i64Val = true; - break; - case TPrimitiveType::FLOAT: - case TPrimitiveType::DOUBLE: - column.__isset.doubleVal = true; - break; - case TPrimitiveType::TIMESTAMP: - case TPrimitiveType::DECIMAL: - case TPrimitiveType::VARCHAR: - case TPrimitiveType::CHAR: - case TPrimitiveType::STRING: - column.__isset.stringVal = true; - break; - default: - DCHECK(false) << "Unhandled column type: " - << TypeToString( - ThriftToType(col.columnType.types[0].scalar_type.type)); - } - result_set_->columns.push_back(column); - } - } -}; - -// TRow result set for HiveServer2 -class HS2RowOrientedResultSet : public QueryResultSet { - public: - // Rows are added into rowset. - HS2RowOrientedResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL) - : metadata_(metadata), result_set_(rowset) { - if (rowset == NULL) { - owned_result_set_.reset(new TRowSet()); - result_set_ = owned_result_set_.get(); - } - } - - virtual ~HS2RowOrientedResultSet() { } - - // Convert expr value to HS2 TRow and store it in TRowSet. - virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales) { - int num_col = col_values.size(); - DCHECK_EQ(num_col, metadata_.columns.size()); - result_set_->rows.push_back(TRow()); - TRow& trow = result_set_->rows.back(); - trow.colVals.resize(num_col); - for (int i = 0; i < num_col; ++i) { - ExprValueToHS2TColumnValue(col_values[i], - metadata_.columns[i].columnType, &(trow.colVals[i])); - } - return Status::OK(); - } - - // Convert TResultRow to HS2 TRow and store it in TRowSet. - virtual Status AddOneRow(const TResultRow& row) { - int num_col = row.colVals.size(); - DCHECK_EQ(num_col, metadata_.columns.size()); - result_set_->rows.push_back(TRow()); - TRow& trow = result_set_->rows.back(); - trow.colVals.resize(num_col); - for (int i = 0; i < num_col; ++i) { - TColumnValueToHS2TColumnValue(row.colVals[i], metadata_.columns[i].columnType, - &(trow.colVals[i])); - } - return Status::OK(); - } - - virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) { - const HS2RowOrientedResultSet* o = static_cast<const HS2RowOrientedResultSet*>(other); - if (start_idx >= o->result_set_->rows.size()) return 0; - const int rows_added = - min(static_cast<size_t>(num_rows), o->result_set_->rows.size() - start_idx); - for (int i = start_idx; i < start_idx + rows_added; ++i) { - result_set_->rows.push_back(o->result_set_->rows[i]); - } - return rows_added; - } - - virtual int64_t ByteSize(int start_idx, int num_rows) { - int64_t bytes = 0; - const int end = - min(static_cast<size_t>(num_rows), result_set_->rows.size() - start_idx); - for (int i = start_idx; i < start_idx + end; ++i) { - bytes += impala::ByteSize(result_set_->rows[i]); - } - return bytes; - } - - virtual size_t size() { return result_set_->rows.size(); } - - private: - // Metadata of the result set - const TResultSetMetadata& metadata_; - - // Points to the TRowSet to be filled. The row set this points to may be owned by - // this object, in which case owned_result_set_ is set. - TRowSet* result_set_; - - // Set to result_set_ if result_set_ is owned. - scoped_ptr<TRowSet> owned_result_set_; -}; - void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* status) { TUniqueId session_id; @@ -473,18 +168,6 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle, status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS); } -namespace { - -QueryResultSet* CreateHS2ResultSet( - TProtocolVersion::type version, const TResultSetMetadata& metadata, TRowSet* rowset) { - if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) { - return new HS2RowOrientedResultSet(metadata, rowset); - } else { - return new HS2ColumnarResultSet(metadata, rowset); - } -} -} - Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size, bool fetch_first, TFetchResultsResp* fetch_results) { shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false); @@ -522,8 +205,8 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size bool is_child_query = exec_state->parent_query_id() != TUniqueId(); TProtocolVersion::type version = is_child_query ? TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1 : session->hs2_version; - scoped_ptr<QueryResultSet> result_set(CreateHS2ResultSet(version, - *(exec_state->result_metadata()), &(fetch_results->results))); + scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateHS2ResultSet( + version, *(exec_state->result_metadata()), &(fetch_results->results))); RETURN_IF_ERROR(exec_state->FetchRows(fetch_size, result_set.get())); fetch_results->__isset.results = true; fetch_results->__set_hasMoreRows(!exec_state->eos()); @@ -763,7 +446,8 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val, // Optionally enable result caching on the QueryExecState. if (cache_num_rows > 0) { status = exec_state->SetResultCache( - CreateHS2ResultSet(session->hs2_version, *exec_state->result_metadata(), nullptr), + QueryResultSet::CreateHS2ResultSet( + session->hs2_version, *exec_state->result_metadata(), nullptr), cache_num_rows); if (!status.ok()) { UnregisterQuery(exec_state->query_id(), false, &status); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/query-result-set.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-result-set.cc b/be/src/service/query-result-set.cc new file mode 100644 index 0000000..3b17af7 --- /dev/null +++ b/be/src/service/query-result-set.cc @@ -0,0 +1,478 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "service/query-result-set.h" + +#include <sstream> +#include <boost/scoped_ptr.hpp> + +#include "rpc/thrift-util.h" +#include "runtime/raw-value.h" +#include "runtime/types.h" +#include "service/hs2-util.h" + +#include "common/names.h" + +using ThriftTColumn = apache::hive::service::cli::thrift::TColumn; +using ThriftTColumnValue = apache::hive::service::cli::thrift::TColumnValue; +using apache::hive::service::cli::thrift::TProtocolVersion; +using apache::hive::service::cli::thrift::TRow; +using apache::hive::service::cli::thrift::TRowSet; + +namespace { + +/// Ascii output precision for double/float +constexpr int ASCII_PRECISION = 16; +} + +namespace impala { + +/// Ascii result set for Beeswax. Rows are returned in ascii text encoding, using "\t" as +/// column delimiter. +class AsciiQueryResultSet : public QueryResultSet { + public: + /// Rows are added into 'rowset'. + AsciiQueryResultSet(const TResultSetMetadata& metadata, vector<string>* rowset) + : metadata_(metadata), result_set_(rowset) {} + + virtual ~AsciiQueryResultSet() {} + + /// Convert one row's expr values stored in 'col_values' to ASCII using "\t" as column + /// delimiter and store it in this result set. + /// TODO: Handle complex types. + virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales); + + /// Convert TResultRow to ASCII using "\t" as column delimiter and store it in this + /// result set. + virtual Status AddOneRow(const TResultRow& row); + + virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows); + virtual int64_t ByteSize(int start_idx, int num_rows); + virtual size_t size() { return result_set_->size(); } + + private: + /// Metadata of the result set + const TResultSetMetadata& metadata_; + + /// Points to the result set to be filled. Not owned by this object. + vector<string>* result_set_; +}; + +/// Result set container for Hive protocol versions >= V6, where results are returned in +/// column-orientation. +class HS2ColumnarResultSet : public QueryResultSet { + public: + HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset); + + virtual ~HS2ColumnarResultSet(){}; + + /// Add a row of expr values + virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales); + + /// Add a row from a TResultRow + virtual Status AddOneRow(const TResultRow& row); + + /// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows' + /// from 'other' into this result set + virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows); + + virtual int64_t ByteSize(int start_idx, int num_rows); + virtual size_t size() { return num_rows_; } + + private: + /// Metadata of the result set + const TResultSetMetadata& metadata_; + + /// Points to the TRowSet to be filled. The row set + /// this points to may be owned by + /// this object, in which case owned_result_set_ is set. + TRowSet* result_set_; + + /// Set to result_set_ if result_set_ is owned. + boost::scoped_ptr<TRowSet> owned_result_set_; + + int64_t num_rows_; + + void InitColumns(); +}; + +/// Row oriented result set for HiveServer2, used to serve HS2 requests with protocol +/// version <= V5. +class HS2RowOrientedResultSet : public QueryResultSet { + public: + /// Rows are added into rowset. + HS2RowOrientedResultSet(const TResultSetMetadata& metadata, TRowSet* rowset); + + virtual ~HS2RowOrientedResultSet() {} + + /// Convert expr values to HS2 TRow and store it in a TRowSet. + virtual Status AddOneRow(const vector<void*>& col_values, const vector<int>& scales); + + /// Convert TResultRow to HS2 TRow and store it in a TRowSet + virtual Status AddOneRow(const TResultRow& row); + + virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows); + virtual int64_t ByteSize(int start_idx, int num_rows); + virtual size_t size() { return result_set_->rows.size(); } + + private: + /// Metadata of the result set + const TResultSetMetadata& metadata_; + + /// Points to the TRowSet to be filled. The row set + /// this points to may be owned by + /// this object, in which case owned_result_set_ is set. + TRowSet* result_set_; + + /// Set to result_set_ if result_set_ is owned. + scoped_ptr<TRowSet> owned_result_set_; +}; + +QueryResultSet* QueryResultSet::CreateAsciiQueryResultSet( + const TResultSetMetadata& metadata, vector<string>* rowset) { + return new AsciiQueryResultSet(metadata, rowset); +} + +QueryResultSet* QueryResultSet::CreateHS2ResultSet( + TProtocolVersion::type version, const TResultSetMetadata& metadata, TRowSet* rowset) { + if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) { + return new HS2RowOrientedResultSet(metadata, rowset); + } else { + return new HS2ColumnarResultSet(metadata, rowset); + } +} + +////////////////////////////////////////////////////////////////////////////////////////// + +Status AsciiQueryResultSet::AddOneRow( + const vector<void*>& col_values, const vector<int>& scales) { + int num_col = col_values.size(); + DCHECK_EQ(num_col, metadata_.columns.size()); + stringstream out_stream; + out_stream.precision(ASCII_PRECISION); + for (int i = 0; i < num_col; ++i) { + // ODBC-187 - ODBC can only take "\t" as the delimiter + out_stream << (i > 0 ? "\t" : ""); + DCHECK_EQ(1, metadata_.columns[i].columnType.types.size()); + RawValue::PrintValue(col_values[i], + ColumnType::FromThrift(metadata_.columns[i].columnType), scales[i], &out_stream); + } + result_set_->push_back(out_stream.str()); + return Status::OK(); +} + +Status AsciiQueryResultSet::AddOneRow(const TResultRow& row) { + int num_col = row.colVals.size(); + DCHECK_EQ(num_col, metadata_.columns.size()); + stringstream out_stream; + out_stream.precision(ASCII_PRECISION); + for (int i = 0; i < num_col; ++i) { + // ODBC-187 - ODBC can only take "\t" as the delimiter + out_stream << (i > 0 ? "\t" : ""); + out_stream << row.colVals[i]; + } + result_set_->push_back(out_stream.str()); + return Status::OK(); +} + +int AsciiQueryResultSet::AddRows( + const QueryResultSet* other, int start_idx, int num_rows) { + const AsciiQueryResultSet* o = static_cast<const AsciiQueryResultSet*>(other); + if (start_idx >= o->result_set_->size()) return 0; + const int rows_added = + min(static_cast<size_t>(num_rows), o->result_set_->size() - start_idx); + result_set_->insert(result_set_->end(), o->result_set_->begin() + start_idx, + o->result_set_->begin() + start_idx + rows_added); + return rows_added; +} + +int64_t AsciiQueryResultSet::ByteSize(int start_idx, int num_rows) { + int64_t bytes = 0; + const int end = min(static_cast<size_t>(num_rows), result_set_->size() - start_idx); + for (int i = start_idx; i < start_idx + end; ++i) { + bytes += sizeof(result_set_[i]) + result_set_[i].capacity(); + } + return bytes; +} + +//////////////////////////////////////////////////////////////////////////////// + +namespace { + +// Utility functions for computing the size of HS2 Thrift structs in bytes. +inline int64_t ByteSize(const ThriftTColumnValue& val) { + return sizeof(val) + val.stringVal.value.capacity(); +} + +int64_t ByteSize(const TRow& row) { + int64_t bytes = sizeof(row); + for (const ThriftTColumnValue& c : row.colVals) { + bytes += ByteSize(c); + } + return bytes; +} + +// Returns the size, in bytes, of a Hive TColumn structure, only taking into account those +// values in the range [start_idx, end_idx). +uint32_t TColumnByteSize(const ThriftTColumn& col, uint32_t start_idx, uint32_t end_idx) { + DCHECK_LE(start_idx, end_idx); + uint32_t num_rows = end_idx - start_idx; + if (num_rows == 0) return 0L; + + if (col.__isset.boolVal) return (num_rows * sizeof(bool)) + col.boolVal.nulls.size(); + if (col.__isset.byteVal) return num_rows + col.byteVal.nulls.size(); + if (col.__isset.i16Val) return (num_rows * sizeof(int16_t)) + col.i16Val.nulls.size(); + if (col.__isset.i32Val) return (num_rows * sizeof(int32_t)) + col.i32Val.nulls.size(); + if (col.__isset.i64Val) return (num_rows * sizeof(int64_t)) + col.i64Val.nulls.size(); + if (col.__isset.doubleVal) { + return (num_rows * sizeof(double)) + col.doubleVal.nulls.size(); + } + if (col.__isset.stringVal) { + uint32_t bytes = 0; + for (int i = start_idx; i < end_idx; ++i) bytes += col.stringVal.values[i].size(); + return bytes + col.stringVal.nulls.size(); + } + + return 0; +} +} + +// Result set container for Hive protocol versions >= V6, where results are returned in +// column-orientation. +HS2ColumnarResultSet::HS2ColumnarResultSet( + const TResultSetMetadata& metadata, TRowSet* rowset) + : metadata_(metadata), result_set_(rowset), num_rows_(0) { + if (rowset == NULL) { + owned_result_set_.reset(new TRowSet()); + result_set_ = owned_result_set_.get(); + } + InitColumns(); +} + +// Add a row of expr values +Status HS2ColumnarResultSet::AddOneRow( + const vector<void*>& col_values, const vector<int>& scales) { + int num_col = col_values.size(); + DCHECK_EQ(num_col, metadata_.columns.size()); + for (int i = 0; i < num_col; ++i) { + ExprValueToHS2TColumn(col_values[i], metadata_.columns[i].columnType, num_rows_, + &(result_set_->columns[i])); + } + ++num_rows_; + return Status::OK(); +} + +// Add a row from a TResultRow +Status HS2ColumnarResultSet::AddOneRow(const TResultRow& row) { + int num_col = row.colVals.size(); + DCHECK_EQ(num_col, metadata_.columns.size()); + for (int i = 0; i < num_col; ++i) { + TColumnValueToHS2TColumn(row.colVals[i], metadata_.columns[i].columnType, num_rows_, + &(result_set_->columns[i])); + } + ++num_rows_; + return Status::OK(); +} + +// Copy all columns starting at 'start_idx' and proceeding for a maximum of 'num_rows' +// from 'other' into this result set +int HS2ColumnarResultSet::AddRows( + const QueryResultSet* other, int start_idx, int num_rows) { + const HS2ColumnarResultSet* o = static_cast<const HS2ColumnarResultSet*>(other); + DCHECK_EQ(metadata_.columns.size(), o->metadata_.columns.size()); + if (start_idx >= o->num_rows_) return 0; + const int rows_added = min<int64_t>(num_rows, o->num_rows_ - start_idx); + for (int j = 0; j < metadata_.columns.size(); ++j) { + ThriftTColumn* from = &o->result_set_->columns[j]; + ThriftTColumn* to = &result_set_->columns[j]; + switch (metadata_.columns[j].columnType.types[0].scalar_type.type) { + case TPrimitiveType::NULL_TYPE: + case TPrimitiveType::BOOLEAN: + StitchNulls( + num_rows_, rows_added, start_idx, from->boolVal.nulls, &(to->boolVal.nulls)); + to->boolVal.values.insert(to->boolVal.values.end(), + from->boolVal.values.begin() + start_idx, + from->boolVal.values.begin() + start_idx + rows_added); + break; + case TPrimitiveType::TINYINT: + StitchNulls( + num_rows_, rows_added, start_idx, from->byteVal.nulls, &(to->byteVal.nulls)); + to->byteVal.values.insert(to->byteVal.values.end(), + from->byteVal.values.begin() + start_idx, + from->byteVal.values.begin() + start_idx + rows_added); + break; + case TPrimitiveType::SMALLINT: + StitchNulls( + num_rows_, rows_added, start_idx, from->i16Val.nulls, &(to->i16Val.nulls)); + to->i16Val.values.insert(to->i16Val.values.end(), + from->i16Val.values.begin() + start_idx, + from->i16Val.values.begin() + start_idx + rows_added); + break; + case TPrimitiveType::INT: + StitchNulls( + num_rows_, rows_added, start_idx, from->i32Val.nulls, &(to->i32Val.nulls)); + to->i32Val.values.insert(to->i32Val.values.end(), + from->i32Val.values.begin() + start_idx, + from->i32Val.values.begin() + start_idx + rows_added); + break; + case TPrimitiveType::BIGINT: + StitchNulls( + num_rows_, rows_added, start_idx, from->i64Val.nulls, &(to->i64Val.nulls)); + to->i64Val.values.insert(to->i64Val.values.end(), + from->i64Val.values.begin() + start_idx, + from->i64Val.values.begin() + start_idx + rows_added); + break; + case TPrimitiveType::FLOAT: + case TPrimitiveType::DOUBLE: + StitchNulls(num_rows_, rows_added, start_idx, from->doubleVal.nulls, + &(to->doubleVal.nulls)); + to->doubleVal.values.insert(to->doubleVal.values.end(), + from->doubleVal.values.begin() + start_idx, + from->doubleVal.values.begin() + start_idx + rows_added); + break; + case TPrimitiveType::TIMESTAMP: + case TPrimitiveType::DECIMAL: + case TPrimitiveType::STRING: + case TPrimitiveType::VARCHAR: + case TPrimitiveType::CHAR: + StitchNulls(num_rows_, rows_added, start_idx, from->stringVal.nulls, + &(to->stringVal.nulls)); + to->stringVal.values.insert(to->stringVal.values.end(), + from->stringVal.values.begin() + start_idx, + from->stringVal.values.begin() + start_idx + rows_added); + break; + default: + DCHECK(false) << "Unsupported type: " + << TypeToString(ThriftToType( + metadata_.columns[j].columnType.types[0].scalar_type.type)); + break; + } + } + num_rows_ += rows_added; + return rows_added; +} + +int64_t HS2ColumnarResultSet::ByteSize(int start_idx, int num_rows) { + const int end = min(start_idx + num_rows, (int)size()); + int64_t bytes = 0L; + for (const ThriftTColumn& c : result_set_->columns) { + bytes += TColumnByteSize(c, start_idx, end); + } + return bytes; +} + +void HS2ColumnarResultSet::InitColumns() { + result_set_->__isset.columns = true; + for (const TColumn& col : metadata_.columns) { + DCHECK(col.columnType.types.size() == 1) + << "Structured columns unsupported in HS2 interface"; + ThriftTColumn column; + switch (col.columnType.types[0].scalar_type.type) { + case TPrimitiveType::NULL_TYPE: + case TPrimitiveType::BOOLEAN: + column.__isset.boolVal = true; + break; + case TPrimitiveType::TINYINT: + column.__isset.byteVal = true; + break; + case TPrimitiveType::SMALLINT: + column.__isset.i16Val = true; + break; + case TPrimitiveType::INT: + column.__isset.i32Val = true; + break; + case TPrimitiveType::BIGINT: + column.__isset.i64Val = true; + break; + case TPrimitiveType::FLOAT: + case TPrimitiveType::DOUBLE: + column.__isset.doubleVal = true; + break; + case TPrimitiveType::TIMESTAMP: + case TPrimitiveType::DECIMAL: + case TPrimitiveType::VARCHAR: + case TPrimitiveType::CHAR: + case TPrimitiveType::STRING: + column.__isset.stringVal = true; + break; + default: + DCHECK(false) << "Unhandled column type: " + << TypeToString( + ThriftToType(col.columnType.types[0].scalar_type.type)); + } + result_set_->columns.push_back(column); + } +} + +HS2RowOrientedResultSet::HS2RowOrientedResultSet( + const TResultSetMetadata& metadata, TRowSet* rowset) + : metadata_(metadata), result_set_(rowset) { + if (rowset == NULL) { + owned_result_set_.reset(new TRowSet()); + result_set_ = owned_result_set_.get(); + } +} + +Status HS2RowOrientedResultSet::AddOneRow( + const vector<void*>& col_values, const vector<int>& scales) { + int num_col = col_values.size(); + DCHECK_EQ(num_col, metadata_.columns.size()); + result_set_->rows.push_back(TRow()); + TRow& trow = result_set_->rows.back(); + trow.colVals.resize(num_col); + for (int i = 0; i < num_col; ++i) { + ExprValueToHS2TColumnValue( + col_values[i], metadata_.columns[i].columnType, &(trow.colVals[i])); + } + return Status::OK(); +} + +Status HS2RowOrientedResultSet::AddOneRow(const TResultRow& row) { + int num_col = row.colVals.size(); + DCHECK_EQ(num_col, metadata_.columns.size()); + result_set_->rows.push_back(TRow()); + TRow& trow = result_set_->rows.back(); + trow.colVals.resize(num_col); + for (int i = 0; i < num_col; ++i) { + TColumnValueToHS2TColumnValue( + row.colVals[i], metadata_.columns[i].columnType, &(trow.colVals[i])); + } + return Status::OK(); +} + +int HS2RowOrientedResultSet::AddRows( + const QueryResultSet* other, int start_idx, int num_rows) { + const HS2RowOrientedResultSet* o = static_cast<const HS2RowOrientedResultSet*>(other); + if (start_idx >= o->result_set_->rows.size()) return 0; + const int rows_added = + min(static_cast<size_t>(num_rows), o->result_set_->rows.size() - start_idx); + for (int i = start_idx; i < start_idx + rows_added; ++i) { + result_set_->rows.push_back(o->result_set_->rows[i]); + } + return rows_added; +} + +int64_t HS2RowOrientedResultSet::ByteSize(int start_idx, int num_rows) { + int64_t bytes = 0; + const int end = + min(static_cast<size_t>(num_rows), result_set_->rows.size() - start_idx); + for (int i = start_idx; i < start_idx + end; ++i) { + bytes += impala::ByteSize(result_set_->rows[i]); + } + return bytes; +} +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3f5380dc/be/src/service/query-result-set.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-result-set.h b/be/src/service/query-result-set.h index b444ca3..e0c88d7 100644 --- a/be/src/service/query-result-set.h +++ b/be/src/service/query-result-set.h @@ -20,15 +20,17 @@ #include "common/status.h" #include "gen-cpp/Data_types.h" +#include "gen-cpp/Results_types.h" +#include "gen-cpp/TCLIService_types.h" #include <vector> namespace impala { -/// Stores client-ready query result rows returned by -/// QueryExecState::FetchRows(). Subclasses implement AddRows() / AddOneRow() to -/// specialise how Impala's row batches are converted to client-API result -/// representations. +/// Wraps a client-API specific result representation, and implements the logic required +/// to translate into that format from Impala's row format. +/// +/// Subclasses implement AddRows() / AddOneRow() to specialise that logic. class QueryResultSet { public: QueryResultSet() {} @@ -58,6 +60,17 @@ class QueryResultSet { /// Returns the size of this result set in number of rows. virtual size_t size() = 0; + + /// Returns a result set suitable for Beeswax-based clients. + static QueryResultSet* CreateAsciiQueryResultSet( + const TResultSetMetadata& metadata, std::vector<std::string>* rowset); + + /// Returns a result set suitable for HS2-based clients. If 'rowset' is nullptr, the + /// returned object will allocate and manage its own rowset. + static QueryResultSet* CreateHS2ResultSet( + apache::hive::service::cli::thrift::TProtocolVersion::type version, + const TResultSetMetadata& metadata, + apache::hive::service::cli::thrift::TRowSet* rowset); }; }