http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index b49451a..e38439b 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -24,8 +24,8 @@ #include "exec/exec-node.h" #include "gen-cpp/ImpalaInternalService_constants.h" #include "util/hdfs-util.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/raw-value.inline.h" #include "runtime/row-batch.h" @@ -56,8 +56,7 @@ namespace impala { const static string& ROOT_PARTITION_KEY = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY; -HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc, - const vector<TExpr>& select_list_texprs, const TDataSink& tsink) +HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc, const TDataSink& tsink) : DataSink(row_desc), table_desc_(nullptr), default_partition_(nullptr), @@ -66,8 +65,6 @@ HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc, tsink.table_sink.hdfs_table_sink.__isset.skip_header_line_count ? tsink.table_sink.hdfs_table_sink.skip_header_line_count : 0), - select_list_texprs_(select_list_texprs), - partition_key_texprs_(tsink.table_sink.hdfs_table_sink.partition_key_exprs), overwrite_(tsink.table_sink.hdfs_table_sink.overwrite), input_is_clustered_(tsink.table_sink.hdfs_table_sink.input_is_clustered), sort_columns_(tsink.table_sink.hdfs_table_sink.sort_columns), @@ -83,30 +80,12 @@ OutputPartition::OutputPartition() partition_descriptor(nullptr), block_size(0) {} -Status HdfsTableSink::PrepareExprs(RuntimeState* state) { - // Prepare select list expressions. - // Disable codegen for these - they would be unused anyway. - // TODO: codegen table sink - RETURN_IF_ERROR( - Expr::Prepare(output_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get())); - RETURN_IF_ERROR( - Expr::Prepare(partition_key_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get())); - - // Prepare partition key exprs and gather dynamic partition key exprs. - for (size_t i = 0; i < partition_key_expr_ctxs_.size(); ++i) { - // Remember non-constant partition key exprs for building hash table of Hdfs files. - if (!partition_key_expr_ctxs_[i]->root()->is_constant()) { - dynamic_partition_key_expr_ctxs_.push_back(partition_key_expr_ctxs_[i]); - } - } - // Sanity check. - DCHECK_LE(partition_key_expr_ctxs_.size(), table_desc_->num_cols()) - << DebugString(); - DCHECK_EQ(partition_key_expr_ctxs_.size(), table_desc_->num_clustering_cols()) - << DebugString(); - DCHECK_GE(output_expr_ctxs_.size(), - table_desc_->num_cols() - table_desc_->num_clustering_cols()) << DebugString(); - +Status HdfsTableSink::Init(const vector<TExpr>& thrift_output_exprs, + const TDataSink& tsink, RuntimeState* state) { + RETURN_IF_ERROR(DataSink::Init(thrift_output_exprs, tsink, state)); + DCHECK(tsink.__isset.table_sink); + RETURN_IF_ERROR(ScalarExpr::Create(tsink.table_sink.hdfs_table_sink.partition_key_exprs, + row_desc_, state, &partition_key_exprs_)); return Status::OK(); } @@ -114,6 +93,8 @@ Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); unique_id_str_ = PrintId(state->fragment_instance_id(), "-"); SCOPED_TIMER(profile()->total_time_counter()); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_exprs_, state, + state->obj_pool(), expr_mem_pool(), &partition_key_expr_evals_)); // TODO: Consider a system-wide random number generator, initialised in a single place. ptime now = microsec_clock::local_time(); @@ -122,11 +103,6 @@ Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke VLOG_QUERY << "Random seed: " << seed; srand(seed); - RETURN_IF_ERROR(Expr::CreateExprTrees( - state->obj_pool(), partition_key_texprs_, &partition_key_expr_ctxs_)); - RETURN_IF_ERROR(Expr::CreateExprTrees( - state->obj_pool(), select_list_texprs_, &output_expr_ctxs_)); - // Resolve table id and set input tuple descriptor. table_desc_ = static_cast<const HdfsTableDescriptor*>( state->desc_tbl().GetTableDescriptor(table_id_)); @@ -140,16 +116,25 @@ Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke staging_dir_ = Substitute("$0/_impala_insert_staging/$1", table_desc_->hdfs_base_dir(), PrintId(state->query_id(), "_")); - RETURN_IF_ERROR(PrepareExprs(state)); + // Prepare partition key exprs and gather dynamic partition key exprs. + for (size_t i = 0; i < partition_key_expr_evals_.size(); ++i) { + // Remember non-constant partition key exprs for building hash table of Hdfs files. + if (!partition_key_expr_evals_[i]->root().is_constant()) { + dynamic_partition_key_expr_evals_.push_back(partition_key_expr_evals_[i]); + } + } + // Sanity check. + DCHECK_LE(partition_key_expr_evals_.size(), table_desc_->num_cols()) + << DebugString(); + DCHECK_EQ(partition_key_expr_evals_.size(), table_desc_->num_clustering_cols()) + << DebugString(); + DCHECK_GE(output_expr_evals_.size(), + table_desc_->num_cols() - table_desc_->num_clustering_cols()) << DebugString(); - partitions_created_counter_ = - ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT); - files_created_counter_ = - ADD_COUNTER(profile(), "FilesCreated", TUnit::UNIT); - rows_inserted_counter_ = - ADD_COUNTER(profile(), "RowsInserted", TUnit::UNIT); - bytes_written_counter_ = - ADD_COUNTER(profile(), "BytesWritten", TUnit::BYTES); + partitions_created_counter_ = ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT); + files_created_counter_ = ADD_COUNTER(profile(), "FilesCreated", TUnit::UNIT); + rows_inserted_counter_ = ADD_COUNTER(profile(), "RowsInserted", TUnit::UNIT); + bytes_written_counter_ = ADD_COUNTER(profile(), "BytesWritten", TUnit::BYTES); encode_timer_ = ADD_TIMER(profile(), "EncodeTimer"); hdfs_write_timer_ = ADD_TIMER(profile(), "HdfsWriteTimer"); compress_timer_ = ADD_TIMER(profile(), "CompressTimer"); @@ -158,8 +143,9 @@ Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke } Status HdfsTableSink::Open(RuntimeState* state) { - RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state)); - RETURN_IF_ERROR(Expr::Open(partition_key_expr_ctxs_, state)); + RETURN_IF_ERROR(DataSink::Open(state)); + DCHECK_EQ(partition_key_exprs_.size(), partition_key_expr_evals_.size()); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_key_expr_evals_, state)); // Get file format for default partition in table descriptor, and build a map from // partition key values to partition descriptor for multiple output format support. The @@ -182,14 +168,15 @@ Status HdfsTableSink::Open(RuntimeState* state) { // Only relevant partitions are remembered in partition_descriptor_map_. bool relevant_partition = true; HdfsPartitionDescriptor* partition = id_to_desc.second; - DCHECK_EQ(partition->partition_key_value_ctxs().size(), - partition_key_expr_ctxs_.size()); - vector<ExprContext*> dynamic_partition_key_value_ctxs; - for (size_t i = 0; i < partition_key_expr_ctxs_.size(); ++i) { + DCHECK_EQ(partition->partition_key_value_evals().size(), + partition_key_expr_evals_.size()); + vector<ScalarExprEvaluator*> dynamic_partition_key_value_evals; + for (size_t i = 0; i < partition_key_expr_evals_.size(); ++i) { // Remember non-constant partition key exprs for building hash table of Hdfs files - if (!partition_key_expr_ctxs_[i]->root()->is_constant()) { - dynamic_partition_key_value_ctxs.push_back( - partition->partition_key_value_ctxs()[i]); + DCHECK(&partition_key_expr_evals_[i]->root() == partition_key_exprs_[i]); + if (!partition_key_exprs_[i]->is_constant()) { + dynamic_partition_key_value_evals.push_back( + partition->partition_key_value_evals()[i]); } else { // Deal with the following: one partition has (year=2009, month=3); another has // (year=2010, month=3). @@ -198,9 +185,9 @@ Status HdfsTableSink::Open(RuntimeState* state) { // partition keys. So only keep a reference to the partition which matches // partition_key_values for constant values, since only that is written to. void* table_partition_key_value = - partition->partition_key_value_ctxs()[i]->GetValue(nullptr); + partition->partition_key_value_evals()[i]->GetValue(nullptr); void* target_partition_key_value = - partition_key_expr_ctxs_[i]->GetValue(nullptr); + partition_key_expr_evals_[i]->GetValue(nullptr); if (table_partition_key_value == nullptr && target_partition_key_value == nullptr) { continue; @@ -208,7 +195,7 @@ Status HdfsTableSink::Open(RuntimeState* state) { if (table_partition_key_value == nullptr || target_partition_key_value == nullptr || !RawValue::Eq(table_partition_key_value, target_partition_key_value, - partition_key_expr_ctxs_[i]->root()->type())) { + partition_key_expr_evals_[i]->root().type())) { relevant_partition = false; break; } @@ -218,7 +205,7 @@ Status HdfsTableSink::Open(RuntimeState* state) { string key; // Pass nullptr as row, since all of these expressions are constant, and can // therefore be evaluated without a valid row context. - GetHashTblKey(nullptr, dynamic_partition_key_value_ctxs, &key); + GetHashTblKey(nullptr, dynamic_partition_key_value_evals, &key); DCHECK(partition_descriptor_map_.find(key) == partition_descriptor_map_.end()) << "Partitions with duplicate 'static' keys found during INSERT"; partition_descriptor_map_[key] = partition; @@ -291,14 +278,14 @@ Status HdfsTableSink::WriteRowsToPartition( Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) { DCHECK_GT(batch->num_rows(), 0); - DCHECK(!dynamic_partition_key_expr_ctxs_.empty()); + DCHECK(!dynamic_partition_key_expr_evals_.empty()); DCHECK(input_is_clustered_); // Initialize the clustered partition and key. if (current_clustered_partition_ == nullptr) { const TupleRow* current_row = batch->GetRow(0); - GetHashTblKey( - current_row, dynamic_partition_key_expr_ctxs_, ¤t_clustered_partition_key_); + GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, + ¤t_clustered_partition_key_); RETURN_IF_ERROR(GetOutputPartition(state, current_row, current_clustered_partition_key_, ¤t_clustered_partition_, false)); } @@ -306,8 +293,8 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batc // Compare the last row of the batch to the last current partition key. If they match, // then all the rows in the batch have the same key and can be written as a whole. string last_row_key; - GetHashTblKey(batch->GetRow(batch->num_rows() - 1), dynamic_partition_key_expr_ctxs_, - &last_row_key); + GetHashTblKey(batch->GetRow(batch->num_rows() - 1), + dynamic_partition_key_expr_evals_, &last_row_key); if (last_row_key == current_clustered_partition_key_) { DCHECK(current_clustered_partition_->second.empty()); RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_clustered_partition_)); @@ -320,7 +307,7 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batc const TupleRow* current_row = batch->GetRow(i); string key; - GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, &key); + GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &key); if (current_clustered_partition_key_ != key) { DCHECK(current_clustered_partition_ != nullptr); @@ -341,7 +328,7 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batc } #ifdef DEBUG string debug_row_key; - GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, &debug_row_key); + GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &debug_row_key); DCHECK_EQ(current_clustered_partition_key_, debug_row_key); #endif DCHECK(current_clustered_partition_ != nullptr); @@ -435,15 +422,15 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state, // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/" // etc. stringstream partition_name_ss; - for (int j = 0; j < partition_key_expr_ctxs_.size(); ++j) { + for (int j = 0; j < partition_key_expr_evals_.size(); ++j) { partition_name_ss << table_desc_->col_descs()[j].name() << "="; - void* value = partition_key_expr_ctxs_[j]->GetValue(row); + void* value = partition_key_expr_evals_[j]->GetValue(row); // nullptr partition keys get a special value to be compatible with Hive. if (value == nullptr) { partition_name_ss << table_desc_->null_partition_key_value(); } else { string value_str; - partition_key_expr_ctxs_[j]->PrintValue(value, &value_str); + partition_key_expr_evals_[j]->PrintValue(value, &value_str); // Directory names containing partition-key values need to be UrlEncoded, in // particular to avoid problems when '/' is part of the key value (which might // occur, for example, with date strings). Hive will URL decode the value @@ -511,26 +498,22 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state, case THdfsFileFormat::TEXT: output_partition->writer.reset( new HdfsTextTableWriter( - this, state, output_partition, &partition_descriptor, table_desc_, - output_expr_ctxs_)); + this, state, output_partition, &partition_descriptor, table_desc_)); break; case THdfsFileFormat::PARQUET: output_partition->writer.reset( new HdfsParquetTableWriter( - this, state, output_partition, &partition_descriptor, table_desc_, - output_expr_ctxs_)); + this, state, output_partition, &partition_descriptor, table_desc_)); break; case THdfsFileFormat::SEQUENCE_FILE: output_partition->writer.reset( new HdfsSequenceTableWriter( - this, state, output_partition, &partition_descriptor, table_desc_, - output_expr_ctxs_)); + this, state, output_partition, &partition_descriptor, table_desc_)); break; case THdfsFileFormat::AVRO: output_partition->writer.reset( new HdfsAvroTableWriter( - this, state, output_partition, &partition_descriptor, table_desc_, - output_expr_ctxs_)); + this, state, output_partition, &partition_descriptor, table_desc_)); break; default: stringstream error_msg; @@ -551,12 +534,12 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state, return CreateNewTmpFile(state, output_partition); } -void HdfsTableSink::GetHashTblKey( - const TupleRow* row, const vector<ExprContext*>& ctxs, string* key) { +void HdfsTableSink::GetHashTblKey(const TupleRow* row, + const vector<ScalarExprEvaluator*>& evals, string* key) { stringstream hash_table_key; - for (int i = 0; i < ctxs.size(); ++i) { + for (int i = 0; i < evals.size(); ++i) { RawValue::PrintValueAsBytes( - ctxs[i]->GetValue(row), ctxs[i]->root()->type(), &hash_table_key); + evals[i]->GetValue(row), evals[i]->root().type(), &hash_table_key); // Additionally append "/" to avoid accidental key collisions. hash_table_key << "/"; } @@ -616,14 +599,14 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) { SCOPED_TIMER(profile()->total_time_counter()); - ExprContext::FreeLocalAllocations(output_expr_ctxs_); - ExprContext::FreeLocalAllocations(partition_key_expr_ctxs_); + ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_); + ScalarExprEvaluator::FreeLocalAllocations(partition_key_expr_evals_); RETURN_IF_ERROR(state->CheckQueryState()); // We don't do any work for an empty batch. if (batch->num_rows() == 0) return Status::OK(); // If there are no partition keys then just pass the whole batch to one partition. - if (dynamic_partition_key_expr_ctxs_.empty()) { + if (dynamic_partition_key_expr_evals_.empty()) { // If there are no dynamic keys just use an empty key. PartitionPair* partition_pair; RETURN_IF_ERROR( @@ -636,7 +619,7 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) { const TupleRow* current_row = batch->GetRow(i); string key; - GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, &key); + GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &key); PartitionPair* partition_pair = nullptr; RETURN_IF_ERROR( GetOutputPartition(state, current_row, key, &partition_pair, false)); @@ -695,7 +678,7 @@ Status HdfsTableSink::FlushFinal(RuntimeState* state) { DCHECK(!closed_); SCOPED_TIMER(profile()->total_time_counter()); - if (dynamic_partition_key_expr_ctxs_.empty()) { + if (dynamic_partition_key_expr_evals_.empty()) { // Make sure we create an output partition even if the input is empty because we need // it to delete the existing data for 'insert overwrite'. PartitionPair* dummy; @@ -727,9 +710,8 @@ void HdfsTableSink::Close(RuntimeState* state) { if (!close_status.ok()) state->LogError(close_status.msg()); } partition_keys_to_output_partitions_.clear(); - - Expr::Close(output_expr_ctxs_, state); - Expr::Close(partition_key_expr_ctxs_, state); + ScalarExprEvaluator::Close(partition_key_expr_evals_, state); + ScalarExpr::Close(partition_key_exprs_); DataSink::Close(state); closed_ = true; } @@ -743,8 +725,9 @@ string HdfsTableSink::DebugString() const { stringstream out; out << "HdfsTableSink(overwrite=" << (overwrite_ ? "true" : "false") << " table_desc=" << table_desc_->DebugString() - << " partition_key_exprs=" << Expr::DebugString(partition_key_expr_ctxs_) - << " output_exprs=" << Expr::DebugString(output_expr_ctxs_) + << " partition_key_exprs=" + << ScalarExpr::DebugString(partition_key_exprs_) + << " output_exprs=" << ScalarExpr::DebugString(output_exprs_) << ")"; return out.str(); }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index b928a20..28581f9 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -127,8 +127,7 @@ struct OutputPartition { /// This is consistent with Hive's behavior. class HdfsTableSink : public DataSink { public: - HdfsTableSink(const RowDescriptor& row_desc, - const std::vector<TExpr>& select_list_texprs, const TDataSink& tsink); + HdfsTableSink(const RowDescriptor& row_desc, const TDataSink& tsink); virtual std::string GetName() { return "HdfsTableSink"; } @@ -163,6 +162,10 @@ class HdfsTableSink : public DataSink { std::string DebugString() const; + protected: + virtual Status Init(const std::vector<TExpr>& thrift_output_exprs, + const TDataSink& tsink, RuntimeState* state) WARN_UNUSED_RESULT; + private: /// Initialises the filenames of a given output partition, and opens the temporary file. /// The partition key is derived from 'row'. If the partition will not have any rows @@ -193,8 +196,8 @@ class HdfsTableSink : public DataSink { /// Generates string key for hash_tbl_ as a concatenation of all evaluated exprs, /// evaluated against 'row'. The generated string is much shorter than the full Hdfs /// file name. - void GetHashTblKey( - const TupleRow* row, const std::vector<ExprContext*>& ctxs, std::string* key); + void GetHashTblKey(const TupleRow* row, + const std::vector<ScalarExprEvaluator*>& evals, std::string* key); /// Given a hashed partition key, get the output partition structure from /// the 'partition_keys_to_output_partitions_'. 'no_more_rows' indicates that no more @@ -203,9 +206,6 @@ class HdfsTableSink : public DataSink { const std::string& key, PartitionPair** partition_pair, bool no_more_rows) WARN_UNUSED_RESULT; - /// Initialise and prepare select and partition key expressions - Status PrepareExprs(RuntimeState* state) WARN_UNUSED_RESULT; - /// Sets hdfs_file_name and tmp_hdfs_file_name of given output partition. /// The Hdfs directory is created from the target table's base Hdfs dir, /// the partition_key_names_ and the evaluated partition_key_exprs_. @@ -246,9 +246,6 @@ class HdfsTableSink : public DataSink { /// Currently this is the default partition since we don't support multi-format sinks. const HdfsPartitionDescriptor* default_partition_; - /// Exprs that materialize output values - std::vector<ExprContext*> output_expr_ctxs_; - /// Table id resolved in Prepare() to set tuple_desc_; TableId table_id_; @@ -257,17 +254,6 @@ class HdfsTableSink : public DataSink { /// scanners while reading from the files. int skip_header_line_count_; - /// Thrift representation of select list exprs, saved in the constructor - /// to be used to initialise output_exprs_ in Init - const std::vector<TExpr>& select_list_texprs_; - - /// Thrift representation of partition keys, saved in the constructor - /// to be used to initialise partition_key_exprs_ in Init - const std::vector<TExpr>& partition_key_texprs_; - - /// Exprs of partition keys. - std::vector<ExprContext*> partition_key_expr_ctxs_; - /// Indicates whether the existing partitions should be overwritten. bool overwrite_; @@ -303,9 +289,13 @@ class HdfsTableSink : public DataSink { /// OutputPartition in the map to simplify the code. PartitionMap partition_keys_to_output_partitions_; - /// Subset of partition_key_expr_ctxs_ which are not constant. Set in Prepare(). + /// Expressions for computing the target partitions to which a row is written. + std::vector<ScalarExpr*> partition_key_exprs_; + std::vector<ScalarExprEvaluator*> partition_key_expr_evals_; + + /// Subset of partition_key_expr_evals_ which are not constant. Set in Prepare(). /// Used for generating the string key of hash_tbl_. - std::vector<ExprContext*> dynamic_partition_key_expr_ctxs_; + std::vector<ScalarExprEvaluator*> dynamic_partition_key_expr_evals_; /// Map from row key (i.e. concatenated non-constant partition keys) to /// partition descriptor. We don't own the HdfsPartitionDescriptors, they http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-writer.cc b/be/src/exec/hdfs-table-writer.cc index b84915a..edc4be8 100644 --- a/be/src/exec/hdfs-table-writer.cc +++ b/be/src/exec/hdfs-table-writer.cc @@ -27,18 +27,17 @@ namespace impala { HdfsTableWriter::HdfsTableWriter(HdfsTableSink* parent, - RuntimeState* state, OutputPartition* output, - const HdfsPartitionDescriptor* partition_desc, - const HdfsTableDescriptor* table_desc, - const vector<ExprContext*>& output_expr_ctxs) + RuntimeState* state, OutputPartition* output, + const HdfsPartitionDescriptor* partition_desc, const HdfsTableDescriptor* table_desc) : parent_(parent), state_(state), output_(output), table_desc_(table_desc), - output_expr_ctxs_(output_expr_ctxs) { + output_expr_evals_(parent->output_expr_evals()) { int num_non_partition_cols = table_desc_->num_cols() - table_desc_->num_clustering_cols(); - DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << parent_->DebugString(); + DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols) + << parent_->DebugString(); } Status HdfsTableWriter::Write(const uint8_t* data, int32_t len) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h index cad304f..cc08b00 100644 --- a/be/src/exec/hdfs-table-writer.h +++ b/be/src/exec/hdfs-table-writer.h @@ -27,13 +27,13 @@ namespace impala { -class RuntimeState; -class OutputPartition; -class ExprContext; -class RowBatch; class HdfsPartitionDescriptor; class HdfsTableDescriptor; class HdfsTableSink; +class OutputPartition; +class RowBatch; +class RuntimeState; +class ScalarExprEvaluator; /// Pure virtual class for writing to hdfs table partition files. /// Subclasses implement the code needed to write to a specific file type. @@ -46,12 +46,10 @@ class HdfsTableWriter { /// output_partition -- Information on the output partition file. /// partition -- the descriptor for the partition being written /// table_desc -- the descriptor for the table being written. - /// output_exprs -- expressions which generate the output values. HdfsTableWriter(HdfsTableSink* parent, RuntimeState* state, OutputPartition* output_partition, const HdfsPartitionDescriptor* partition_desc, - const HdfsTableDescriptor* table_desc, - const std::vector<ExprContext*>& output_expr_ctxs); + const HdfsTableDescriptor* table_desc); virtual ~HdfsTableWriter() { } @@ -129,8 +127,9 @@ class HdfsTableWriter { /// Table descriptor of table to be written. const HdfsTableDescriptor* table_desc_; - /// Expressions that materialize output values. - std::vector<ExprContext*> output_expr_ctxs_; + /// Reference to the evaluators of expressions which generate the output value. + /// The evaluators are owned by sink which owns this table writer. + const std::vector<ScalarExprEvaluator*>& output_expr_evals_; /// Subclass should populate any file format specific stats. TInsertStats stats_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 0a66460..c1a12d6 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -308,7 +308,7 @@ Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) { char* col = boundary_column_.buffer(); int num_fields = 0; RETURN_IF_ERROR(delimited_text_parser_->FillColumns<true>(boundary_column_.len(), - &col, &num_fields, &field_locations_[0])); + &col, &num_fields, field_locations_.data())); TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow()); int max_tuples = row_batch->capacity() - row_batch->num_rows(); @@ -376,8 +376,8 @@ Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, int* num_tuples) { SCOPED_TIMER(parse_delimiter_timer_); RETURN_IF_ERROR(delimited_text_parser_->ParseFieldLocations(max_tuples, byte_buffer_end_ - byte_buffer_ptr_, &byte_buffer_ptr_, - &row_end_locations_[0], - &field_locations_[0], num_tuples, &num_fields, &col_start)); + row_end_locations_.data(), field_locations_.data(), num_tuples, + &num_fields, &col_start)); } // Materialize the tuples into the in memory format for this query @@ -387,7 +387,7 @@ Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, int* num_tuples) { // There can be one partial tuple which returned no more fields from this buffer. DCHECK_LE(*num_tuples, num_fields + 1); if (!boundary_column_.IsEmpty()) { - RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool)); + RETURN_IF_ERROR(CopyBoundaryField(field_locations_.data(), pool)); boundary_column_.Clear(); } num_tuples_materialized = WriteFields(num_fields, *num_tuples, pool, tuple_row_mem); @@ -742,13 +742,13 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) { // codegen'd using the IRBuilder for the specific tuple description. This function // is then injected into the cross-compiled driving function, WriteAlignedTuples(). Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node, - const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) { + const vector<ScalarExpr*>& conjuncts, Function** write_aligned_tuples_fn) { *write_aligned_tuples_fn = nullptr; DCHECK(node->runtime_state()->ShouldCodegen()); LlvmCodeGen* codegen = node->runtime_state()->codegen(); DCHECK(codegen != nullptr); Function* write_complete_tuple_fn; - RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs, + RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjuncts, &write_complete_tuple_fn)); DCHECK(write_complete_tuple_fn != nullptr); RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn, @@ -786,7 +786,7 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool, TupleRow* row) { SCOPED_TIMER(scan_node_->materialize_tuple_timer()); - FieldLocation* fields = &field_locations_[0]; + FieldLocation* fields = field_locations_.data(); int num_tuples_processed = 0; int num_tuples_materialized = 0; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-text-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h index 042cd18..af8fa8a 100644 --- a/be/src/exec/hdfs-text-scanner.h +++ b/be/src/exec/hdfs-text-scanner.h @@ -62,7 +62,7 @@ class HdfsTextScanner : public HdfsScanner { /// Codegen WriteAlignedTuples(). Stores the resulting function in /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise. static Status Codegen(HdfsScanNodeBase* node, - const std::vector<ExprContext*>& conjunct_ctxs, + const std::vector<ScalarExpr*>& conjuncts, llvm::Function** write_aligned_tuples_fn); /// Suffix for lzo index files. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-text-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-table-writer.cc b/be/src/exec/hdfs-text-table-writer.cc index cba4032..82db972 100644 --- a/be/src/exec/hdfs-text-table-writer.cc +++ b/be/src/exec/hdfs-text-table-writer.cc @@ -17,8 +17,8 @@ #include "exec/hdfs-text-table-writer.h" #include "exec/exec-node.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.h" @@ -45,12 +45,10 @@ static const int64_t COMPRESSED_BUFFERED_SIZE = 60 * 1024 * 1024; namespace impala { HdfsTextTableWriter::HdfsTextTableWriter(HdfsTableSink* parent, - RuntimeState* state, OutputPartition* output, - const HdfsPartitionDescriptor* partition, - const HdfsTableDescriptor* table_desc, - const vector<ExprContext*>& output_expr_ctxs) - : HdfsTableWriter( - parent, state, output, partition, table_desc, output_expr_ctxs) { + RuntimeState* state, OutputPartition* output, + const HdfsPartitionDescriptor* partition, + const HdfsTableDescriptor* table_desc) + : HdfsTableWriter(parent, state, output, partition, table_desc) { tuple_delim_ = partition->line_delim(); field_delim_ = partition->field_delim(); escape_char_ = partition->escape_char(); @@ -112,7 +110,7 @@ Status HdfsTextTableWriter::AppendRows( bool all_rows = row_group_indices.empty(); int num_non_partition_cols = table_desc_->num_cols() - table_desc_->num_clustering_cols(); - DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << parent_->DebugString(); + DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols) << parent_->DebugString(); { SCOPED_TIMER(parent_->encode_timer()); @@ -126,9 +124,9 @@ Status HdfsTextTableWriter::AppendRows( // partition col exprs are the last in output exprs, it's ok to just write // the first num_non_partition_cols values. for (int j = 0; j < num_non_partition_cols; ++j) { - void* value = output_expr_ctxs_[j]->GetValue(current_row); + void* value = output_expr_evals_[j]->GetValue(current_row); if (value != NULL) { - const ColumnType& type = output_expr_ctxs_[j]->root()->type(); + const ColumnType& type = output_expr_evals_[j]->root().type(); if (type.type == TYPE_CHAR) { char* val_ptr = StringValue::CharSlotToPtr(value, type); StringValue sv(val_ptr, StringValue::UnpaddedCharLength(val_ptr, type.len)); @@ -136,7 +134,7 @@ Status HdfsTextTableWriter::AppendRows( } else if (type.IsVarLenStringType()) { PrintEscaped(reinterpret_cast<const StringValue*>(value)); } else { - output_expr_ctxs_[j]->PrintValue(value, &rowbatch_stringstream_); + output_expr_evals_[j]->PrintValue(value, &rowbatch_stringstream_); } } else { // NULLs in hive are encoded based on the 'serialization.null.format' property. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-text-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-table-writer.h b/be/src/exec/hdfs-text-table-writer.h index 2944f23..589ed23 100644 --- a/be/src/exec/hdfs-text-table-writer.h +++ b/be/src/exec/hdfs-text-table-writer.h @@ -43,10 +43,9 @@ class TupleRow; class HdfsTextTableWriter : public HdfsTableWriter { public: HdfsTextTableWriter(HdfsTableSink* parent, - RuntimeState* state, OutputPartition* output, - const HdfsPartitionDescriptor* partition, - const HdfsTableDescriptor* table_desc, - const std::vector<ExprContext*>& output_expr_ctxs); + RuntimeState* state, OutputPartition* output, + const HdfsPartitionDescriptor* partition, + const HdfsTableDescriptor* table_desc); ~HdfsTextTableWriter() { } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node-base.cc b/be/src/exec/kudu-scan-node-base.cc index c8a7870..d587660 100644 --- a/be/src/exec/kudu-scan-node-base.cc +++ b/be/src/exec/kudu-scan-node-base.cc @@ -137,8 +137,4 @@ void KuduScanNodeBase::StopAndFinalizeCounters() { PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_); } -Status KuduScanNodeBase::GetConjunctCtxs(vector<ExprContext*>* ctxs) { - return Expr::CloneIfNotExists(conjunct_ctxs_, runtime_state_, ctxs); -} - } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node-base.h b/be/src/exec/kudu-scan-node-base.h index 70e5b94..49af13c 100644 --- a/be/src/exec/kudu-scan-node-base.h +++ b/be/src/exec/kudu-scan-node-base.h @@ -98,10 +98,6 @@ class KuduScanNodeBase : public ScanNode { static const std::string KUDU_ROUND_TRIPS; static const std::string KUDU_REMOTE_TOKENS; - /// Returns a cloned copy of the scan node's conjuncts. Requires that the expressions - /// have been open previously. - Status GetConjunctCtxs(vector<ExprContext*>* ctxs); - const TupleDescriptor* tuple_desc() const { return tuple_desc_; } kudu::client::KuduClient* kudu_client() { return client_; } RuntimeProfile::Counter* kudu_round_trips() const { return kudu_round_trips_; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index beead44..08a3339 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -21,6 +21,7 @@ #include "exec/kudu-scanner.h" #include "exec/kudu-util.h" +#include "exprs/scalar-expr.h" #include "gutil/gscoped_ptr.h" #include "runtime/fragment-instance-state.h" #include "runtime/mem-pool.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc index 3cae4af..3fb0a18 100644 --- a/be/src/exec/kudu-scanner.cc +++ b/be/src/exec/kudu-scanner.cc @@ -22,9 +22,9 @@ #include <vector> #include <string> -#include "exprs/expr.h" -#include "exprs/expr-context.h" #include "exec/kudu-util.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.h" @@ -63,6 +63,7 @@ const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT"; KuduScanner::KuduScanner(KuduScanNodeBase* scan_node, RuntimeState* state) : scan_node_(scan_node), state_(state), + expr_mem_pool_(new MemPool(scan_node->expr_mem_tracker())), cur_kudu_batch_num_read_(0), last_alive_time_micros_(0) { } @@ -73,7 +74,8 @@ Status KuduScanner::Open() { if (slot->type().type != TYPE_TIMESTAMP) continue; timestamp_slots_.push_back(slot); } - return scan_node_->GetConjunctCtxs(&conjunct_ctxs_); + return ScalarExprEvaluator::Clone(&obj_pool_, state_, expr_mem_pool_.get(), + scan_node_->conjunct_evals(), &conjunct_evals_); } void KuduScanner::KeepKuduScannerAlive() { @@ -128,7 +130,8 @@ Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) { void KuduScanner::Close() { if (scanner_) CloseCurrentClientScanner(); - Expr::Close(conjunct_ctxs_, state_); + ScalarExprEvaluator::Close(conjunct_evals_, state_); + expr_mem_pool_->FreeAll(); } Status KuduScanner::OpenNextScanToken(const string& scan_token) { @@ -185,7 +188,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me // Iterate through the Kudu rows, evaluate conjuncts and deep-copy survivors into // 'row_batch'. - bool has_conjuncts = !conjunct_ctxs_.empty(); + bool has_conjuncts = !conjunct_evals_.empty(); int num_rows = cur_kudu_batch_.NumRows(); for (int krow_idx = cur_kudu_batch_num_read_; krow_idx < num_rows; ++krow_idx) { @@ -223,8 +226,8 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me // Evaluate the conjuncts that haven't been pushed down to Kudu. Conjunct evaluation // is performed directly on the Kudu tuple because its memory layout is identical to // Impala's. We only copy the surviving tuples to Impala's output row batch. - if (has_conjuncts && !ExecNode::EvalConjuncts(&conjunct_ctxs_[0], - conjunct_ctxs_.size(), reinterpret_cast<TupleRow*>(&kudu_tuple))) { + if (has_conjuncts && !ExecNode::EvalConjuncts(conjunct_evals_.data(), + conjunct_evals_.size(), reinterpret_cast<TupleRow*>(&kudu_tuple))) { continue; } // Deep copy the tuple, set it in a new row, and commit the row. @@ -238,7 +241,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me // Move to the next tuple in the tuple buffer. *tuple_mem = next_tuple(*tuple_mem); } - ExprContext::FreeLocalAllocations(conjunct_ctxs_); + ScalarExprEvaluator::FreeLocalAllocations(conjunct_evals_); // Check the status in case an error status was set during conjunct evaluation. return state_->GetQueryStatus(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h index 7a6ca76..e327e7a 100644 --- a/be/src/exec/kudu-scanner.h +++ b/be/src/exec/kudu-scanner.h @@ -21,6 +21,7 @@ #include <boost/scoped_ptr.hpp> #include <kudu/client/client.h> +#include "common/object-pool.h" #include "exec/kudu-scan-node-base.h" #include "runtime/descriptors.h" @@ -86,6 +87,13 @@ class KuduScanner { KuduScanNodeBase* scan_node_; RuntimeState* state_; + /// For objects which have the same life time as the scanner. + ObjectPool obj_pool_; + + /// MemPool used for expression evaluators in this scanner. Need to be local + /// to each scanner as MemPool is not thread safe. + boost::scoped_ptr<MemPool> expr_mem_pool_; + /// The kudu::client::KuduScanner for the current scan token. A new KuduScanner is /// created for each scan token using KuduScanToken::DeserializeIntoScanner(). boost::scoped_ptr<kudu::client::KuduScanner> scanner_; @@ -100,7 +108,7 @@ class KuduScanner { int64_t last_alive_time_micros_; /// The scanner's cloned copy of the conjuncts to apply. - vector<ExprContext*> conjunct_ctxs_; + vector<ScalarExprEvaluator*> conjunct_evals_; /// Timestamp slots in the tuple descriptor of the scan node. Used to convert Kudu /// UNIXTIME_MICRO values inline. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-table-sink.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index b5ffc27..32aab79 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -21,8 +21,8 @@ #include <thrift/protocol/TDebugProtocol.h> #include "exec/kudu-util.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "gen-cpp/ImpalaInternalService_constants.h" #include "gutil/gscoped_ptr.h" #include "runtime/exec-env.h" @@ -71,31 +71,18 @@ const static string& ROOT_PARTITION_KEY = // Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693). const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024; -KuduTableSink::KuduTableSink(const RowDescriptor& row_desc, - const vector<TExpr>& select_list_texprs, - const TDataSink& tsink) +KuduTableSink::KuduTableSink(const RowDescriptor& row_desc, const TDataSink& tsink) : DataSink(row_desc), table_id_(tsink.table_sink.target_table_id), - select_list_texprs_(select_list_texprs), sink_action_(tsink.table_sink.action), kudu_table_sink_(tsink.table_sink.kudu_table_sink) { + DCHECK(tsink.__isset.table_sink); DCHECK(KuduIsAvailable()); } -Status KuduTableSink::PrepareExprs(RuntimeState* state) { - // From the thrift expressions create the real exprs. - RETURN_IF_ERROR(Expr::CreateExprTrees(state->obj_pool(), select_list_texprs_, - &output_expr_ctxs_)); - // Prepare the exprs to run. - RETURN_IF_ERROR( - Expr::Prepare(output_expr_ctxs_, state, row_desc_, expr_mem_tracker_.get())); - return Status::OK(); -} - Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) { RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker)); SCOPED_TIMER(profile()->total_time_counter()); - RETURN_IF_ERROR(PrepareExprs(state)); // Get the kudu table descriptor. TableDescriptor* table_desc = state->desc_tbl().GetTableDescriptor(table_id_); @@ -130,7 +117,7 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke } Status KuduTableSink::Open(RuntimeState* state) { - RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state)); + RETURN_IF_ERROR(DataSink::Open(state)); int64_t required_mem = FLAGS_kudu_sink_mem_required; if (!mem_tracker_->TryConsume(required_mem)) { @@ -206,7 +193,7 @@ kudu::client::KuduWriteOperation* KuduTableSink::NewWriteOp() { Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { SCOPED_TIMER(profile()->total_time_counter()); - ExprContext::FreeLocalAllocations(output_expr_ctxs_); + ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_); RETURN_IF_ERROR(state->CheckQueryState()); const KuduSchema& table_schema = table_->schema(); @@ -224,14 +211,14 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { unique_ptr<kudu::client::KuduWriteOperation> write(NewWriteOp()); bool add_row = true; - for (int j = 0; j < output_expr_ctxs_.size(); ++j) { - // output_expr_ctxs_ only contains the columns that the op + for (int j = 0; j < output_expr_evals_.size(); ++j) { + // output_expr_evals_ only contains the columns that the op // applies to, i.e. columns explicitly mentioned in the query, and // referenced_columns is then used to map to actual column positions. int col = kudu_table_sink_.referenced_columns.empty() ? j : kudu_table_sink_.referenced_columns[j]; - void* value = output_expr_ctxs_[j]->GetValue(current_row); + void* value = output_expr_evals_[j]->GetValue(current_row); if (value == nullptr) { if (table_schema.Column(col).is_nullable()) { KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col), @@ -249,7 +236,7 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { } } - PrimitiveType type = output_expr_ctxs_[j]->root()->type().type; + PrimitiveType type = output_expr_evals_[j]->root().type().type; WriteKuduValue(col, type, value, true, write->mutable_row()); } if (add_row) write_ops.push_back(move(write)); @@ -339,7 +326,6 @@ void KuduTableSink::Close(RuntimeState* state) { client_ = nullptr; } SCOPED_TIMER(profile()->total_time_counter()); - Expr::Close(output_expr_ctxs_, state); DataSink::Close(state); closed_ = true; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-table-sink.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h index b2239fb..dcf5241 100644 --- a/be/src/exec/kudu-table-sink.h +++ b/be/src/exec/kudu-table-sink.h @@ -25,8 +25,8 @@ #include "common/status.h" #include "exec/kudu-util.h" #include "exec/data-sink.h" -#include "exprs/expr-context.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" namespace impala { @@ -53,8 +53,7 @@ namespace impala { /// status. All reported errors (ignored or not) will be logged via the RuntimeState. class KuduTableSink : public DataSink { public: - KuduTableSink(const RowDescriptor& row_desc, - const std::vector<TExpr>& select_list_texprs, const TDataSink& tsink); + KuduTableSink(const RowDescriptor& row_desc, const TDataSink& tsink); virtual std::string GetName() { return "KuduTableSink"; } @@ -76,9 +75,6 @@ class KuduTableSink : public DataSink { virtual void Close(RuntimeState* state); private: - /// Turn thrift TExpr into Expr and prepare them to run - Status PrepareExprs(RuntimeState* state); - /// Create a new write operation according to the sink type. kudu::client::KuduWriteOperation* NewWriteOp(); @@ -94,11 +90,6 @@ class KuduTableSink : public DataSink { /// The descriptor of the KuduTable being written to. Set on Prepare(). const KuduTableDescriptor* table_desc_; - /// The expression descriptors and the prepared expressions. The latter are built - /// on Prepare(). - const std::vector<TExpr>& select_list_texprs_; - std::vector<ExprContext*> output_expr_ctxs_; - /// The Kudu client, owned by the ExecEnv. kudu::client::KuduClient* client_ = nullptr; /// The Kudu table and session. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/nested-loop-join-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc index caa57b4..14d9ac8 100644 --- a/be/src/exec/nested-loop-join-node.cc +++ b/be/src/exec/nested-loop-join-node.cc @@ -20,7 +20,8 @@ #include <sstream> #include <gutil/strings/substitute.h> -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" #include "runtime/row-batch.h" @@ -50,18 +51,20 @@ NestedLoopJoinNode::~NestedLoopJoinNode() { Status NestedLoopJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(BlockingJoinNode::Init(tnode, state)); DCHECK(tnode.__isset.nested_loop_join_node); - RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, tnode.nested_loop_join_node.join_conjuncts, - &join_conjunct_ctxs_)); - + // join_conjunct_evals_ are evaluated in the context of rows assembled from + // all inner and outer tuples. + RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc()); + RETURN_IF_ERROR(ScalarExpr::Create(tnode.nested_loop_join_node.join_conjuncts, + full_row_desc, state, &join_conjuncts_)); DCHECK(tnode.nested_loop_join_node.join_op != TJoinOp::CROSS_JOIN || - join_conjunct_ctxs_.size() == 0) << "Join conjuncts in a cross join"; + join_conjuncts_.size() == 0) << "Join conjuncts in a cross join"; return Status::OK(); } Status NestedLoopJoinNode::Open(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(BlockingJoinNode::Open(state)); - RETURN_IF_ERROR(Expr::Open(join_conjunct_ctxs_, state)); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(join_conjunct_evals_, state)); // Check for errors and free local allocations before opening children. RETURN_IF_CANCELLED(state); @@ -95,13 +98,8 @@ Status NestedLoopJoinNode::Open(RuntimeState* state) { Status NestedLoopJoinNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(BlockingJoinNode::Prepare(state)); - - // join_conjunct_ctxs_ are evaluated in the context of rows assembled from - // all inner and outer tuples. - RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc()); - RETURN_IF_ERROR( - Expr::Prepare(join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker())); - + RETURN_IF_ERROR(ScalarExprEvaluator::Create(join_conjuncts_, state, + pool_, expr_mem_pool(), &join_conjunct_evals_)); builder_.reset(new NljBuilder(child(1)->row_desc(), state)); RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker())); runtime_profile()->PrependChild(builder_->profile()); @@ -138,7 +136,8 @@ Status NestedLoopJoinNode::Reset(RuntimeState* state) { void NestedLoopJoinNode::Close(RuntimeState* state) { if (is_closed()) return; - Expr::Close(join_conjunct_ctxs_, state); + ScalarExprEvaluator::Close(join_conjunct_evals_, state); + ScalarExpr::Close(join_conjuncts_); if (builder_ != NULL) { builder_->Close(state); builder_.reset(); @@ -291,8 +290,9 @@ Status NestedLoopJoinNode::GetNextLeftOuterJoin(RuntimeState* state, Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state, RowBatch* output_batch) { - ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0]; - size_t num_join_ctxs = join_conjunct_ctxs_.size(); + ScalarExprEvaluator* const* join_conjunct_evals = join_conjunct_evals_.data(); + size_t num_join_conjuncts = join_conjuncts_.size(); + DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size()); const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); while (!eos_) { @@ -309,7 +309,8 @@ Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state, RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); } - if (!EvalConjuncts(join_conjunct_ctxs, num_join_ctxs, semi_join_staging_row_)) { + if (!EvalConjuncts( + join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { continue; } // A match is found. Create the output row from the probe row. @@ -336,8 +337,9 @@ Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state, Status NestedLoopJoinNode::GetNextLeftAntiJoin(RuntimeState* state, RowBatch* output_batch) { - ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0]; - size_t num_join_ctxs = join_conjunct_ctxs_.size(); + ScalarExprEvaluator* const* join_conjunct_evals = join_conjunct_evals_.data(); + size_t num_join_conjuncts = join_conjuncts_.size(); + DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size()); const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); while (!eos_) { @@ -354,7 +356,8 @@ Status NestedLoopJoinNode::GetNextLeftAntiJoin(RuntimeState* state, RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); } - if (EvalConjuncts(join_conjunct_ctxs, num_join_ctxs, semi_join_staging_row_)) { + if (EvalConjuncts( + join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { // Found a match for the probe row. This row will not be in the result. matched_probe_ = true; break; @@ -388,8 +391,9 @@ Status NestedLoopJoinNode::GetNextRightOuterJoin(RuntimeState* state, Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state, RowBatch* output_batch) { - ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0]; - size_t num_join_ctxs = join_conjunct_ctxs_.size(); + ScalarExprEvaluator* const* join_conjunct_evals = join_conjunct_evals_.data(); + size_t num_join_conjuncts = join_conjuncts_.size(); + DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size()); DCHECK(matching_build_rows_ != NULL); const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); @@ -397,8 +401,8 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state, DCHECK(HasValidProbeRow()); while (!build_row_iterator_.AtEnd()) { DCHECK(HasValidProbeRow()); - // This loop can go on for a long time if the conjuncts are very selective. Do - // query maintenance every N iterations. + // This loop can go on for a long time if the conjuncts are very selective. + // Do query maintenance every N iterations. if ((current_build_row_idx_ & (N - 1)) == 0) { if (ReachedLimit()) { eos_ = true; @@ -418,7 +422,8 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state, CreateOutputRow(semi_join_staging_row_, current_probe_row_, build_row_iterator_.GetRow()); // Evaluate the join conjuncts on the semi-join staging row. - if (!EvalConjuncts(join_conjunct_ctxs, num_join_ctxs, semi_join_staging_row_)) { + if (!EvalConjuncts( + join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { build_row_iterator_.Next(); ++current_build_row_idx_; continue; @@ -445,8 +450,9 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state, Status NestedLoopJoinNode::GetNextRightAntiJoin(RuntimeState* state, RowBatch* output_batch) { - ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0]; - size_t num_join_ctxs = join_conjunct_ctxs_.size(); + ScalarExprEvaluator* const* join_conjunct_evals = join_conjunct_evals_.data(); + size_t num_join_conjuncts = join_conjuncts_.size(); + DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size()); DCHECK(matching_build_rows_ != NULL); const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); @@ -468,7 +474,8 @@ Status NestedLoopJoinNode::GetNextRightAntiJoin(RuntimeState* state, } CreateOutputRow(semi_join_staging_row_, current_probe_row_, build_row_iterator_.GetRow()); - if (EvalConjuncts(join_conjunct_ctxs, num_join_ctxs, semi_join_staging_row_)) { + if (EvalConjuncts( + join_conjunct_evals, num_join_conjuncts, semi_join_staging_row_)) { matching_build_rows_->Set(current_build_row_idx_, true); } build_row_iterator_.Next(); @@ -501,8 +508,9 @@ Status NestedLoopJoinNode::ProcessUnmatchedProbeRow(RuntimeState* state, RowBatch* output_batch) { DCHECK(!matched_probe_); DCHECK(current_probe_row_ != NULL); - ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; - size_t num_ctxs = conjunct_ctxs_.size(); + ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data(); + size_t num_conjuncts = conjuncts_.size(); + DCHECK_EQ(num_conjuncts, conjunct_evals_.size()); TupleRow* output_row = output_batch->GetRow(output_batch->AddRow()); if (join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN) { CreateOutputRow(output_row, current_probe_row_, NULL); @@ -512,7 +520,7 @@ Status NestedLoopJoinNode::ProcessUnmatchedProbeRow(RuntimeState* state, output_batch->CopyRow(current_probe_row_, output_row); } // Evaluate all the other (non-join) conjuncts. - if (EvalConjuncts(conjunct_ctxs, num_ctxs, output_row)) { + if (EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) { VLOG_ROW << "match row:" << PrintRow(output_row, row_desc()); output_batch->CommitLastRow(); ++num_rows_returned_; @@ -530,8 +538,9 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows( current_build_row_idx_ = 0; process_unmatched_build_rows_ = true; } - ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; - size_t num_ctxs = conjunct_ctxs_.size(); + ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data(); + size_t num_conjuncts = conjuncts_.size(); + DCHECK_EQ(num_conjuncts, conjunct_evals_.size()); DCHECK(matching_build_rows_ != NULL); const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); @@ -566,7 +575,7 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows( ++current_build_row_idx_; // Evaluate conjuncts that don't affect the matching rows of the join on the // result row. - if (EvalConjuncts(conjunct_ctxs, num_ctxs, output_row)) { + if (EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) { VLOG_ROW << "match row: " << PrintRow(output_row, row_desc()); output_batch->CommitLastRow(); ++num_rows_returned_; @@ -584,10 +593,12 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows( Status NestedLoopJoinNode::FindBuildMatches( RuntimeState* state, RowBatch* output_batch, bool* return_output_batch) { *return_output_batch = false; - ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0]; - size_t num_join_ctxs = join_conjunct_ctxs_.size(); - ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0]; - size_t num_ctxs = conjunct_ctxs_.size(); + ScalarExprEvaluator* const* join_conjunct_evals = join_conjunct_evals_.data(); + size_t num_join_conjuncts = join_conjuncts_.size(); + DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size()); + ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data(); + size_t num_conjuncts = conjuncts_.size(); + DCHECK_EQ(num_conjuncts, conjunct_evals_.size()); const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size()); while (!build_row_iterator_.AtEnd()) { @@ -609,12 +620,14 @@ Status NestedLoopJoinNode::FindBuildMatches( RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); } - if (!EvalConjuncts(join_conjunct_ctxs, num_join_ctxs, output_row)) continue; + if (!EvalConjuncts(join_conjunct_evals, num_join_conjuncts, output_row)) { + continue; + } matched_probe_ = true; if (matching_build_rows_ != NULL) { matching_build_rows_->Set(current_build_row_idx_ - 1, true); } - if (!EvalConjuncts(conjunct_ctxs, num_ctxs, output_row)) continue; + if (!EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) continue; VLOG_ROW << "match row: " << PrintRow(output_row, row_desc()); output_batch->CommitLastRow(); ++num_rows_returned_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/nested-loop-join-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h index 1d3d2da..94f1dae 100644 --- a/be/src/exec/nested-loop-join-node.h +++ b/be/src/exec/nested-loop-join-node.h @@ -83,7 +83,8 @@ class NestedLoopJoinNode : public BlockingJoinNode { ///////////////////////////////////////// /// Join conjuncts - std::vector<ExprContext*> join_conjunct_ctxs_; + std::vector<ScalarExpr*> join_conjuncts_; + std::vector<ScalarExprEvaluator*> join_conjunct_evals_; /// Optimized build for the case where the right child is a SingularRowSrcNode. Status ConstructSingularBuildSide(RuntimeState* state); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/old-hash-table-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/old-hash-table-ir.cc b/be/src/exec/old-hash-table-ir.cc index ef94892..2436ef1 100644 --- a/be/src/exec/old-hash-table-ir.cc +++ b/be/src/exec/old-hash-table-ir.cc @@ -29,12 +29,12 @@ uint8_t* OldHashTable::expr_value_null_bits() const { return expr_value_null_bits_; } -ExprContext* const* OldHashTable::build_expr_ctxs() const { - return &build_expr_ctxs_[0]; +ScalarExprEvaluator* const* OldHashTable::build_expr_evals() const { + return build_expr_evals_.data(); } -ExprContext* const* OldHashTable::probe_expr_ctxs() const { - return &probe_expr_ctxs_[0]; +ScalarExprEvaluator* const* OldHashTable::probe_expr_evals() const { + return probe_expr_evals_.data(); } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/old-hash-table-test.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/old-hash-table-test.cc b/be/src/exec/old-hash-table-test.cc index 31e0cac..e873791 100644 --- a/be/src/exec/old-hash-table-test.cc +++ b/be/src/exec/old-hash-table-test.cc @@ -22,8 +22,8 @@ #include "common/compiler-util.h" #include "exec/old-hash-table.inline.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "exprs/slot-ref.h" #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" @@ -45,30 +45,37 @@ class OldHashTableTest : public testing::Test { ObjectPool pool_; MemTracker tracker_; MemPool mem_pool_; - vector<ExprContext*> build_expr_ctxs_; - vector<ExprContext*> probe_expr_ctxs_; + + vector<ScalarExpr*> build_exprs_; + vector<ScalarExprEvaluator*> build_expr_evals_; + vector<ScalarExpr*> probe_exprs_; + vector<ScalarExprEvaluator*> probe_expr_evals_; virtual void SetUp() { RowDescriptor desc; - Status status; - // Not very easy to test complex tuple layouts so this test will use the // simplest. The purpose of these tests is to exercise the hash map // internals so a simple build/probe expr is fine. - Expr* expr = pool_.Add(new SlotRef(TYPE_INT, 0)); - build_expr_ctxs_.push_back(pool_.Add(new ExprContext(expr))); - ASSERT_OK(Expr::Prepare(build_expr_ctxs_, NULL, desc, &tracker_)); - ASSERT_OK(Expr::Open(build_expr_ctxs_, NULL)); - - expr = pool_.Add(new SlotRef(TYPE_INT, 0)); - probe_expr_ctxs_.push_back(pool_.Add(new ExprContext(expr))); - ASSERT_OK(Expr::Prepare(probe_expr_ctxs_, NULL, desc, &tracker_)); - ASSERT_OK(Expr::Open(probe_expr_ctxs_, NULL)); + ScalarExpr* build_expr = pool_.Add(new SlotRef(TYPE_INT, 0)); + ASSERT_OK(build_expr->Init(desc, nullptr)); + build_exprs_.push_back(build_expr); + ASSERT_OK(ScalarExprEvaluator::Create(build_exprs_, nullptr, &pool_, &mem_pool_, + &build_expr_evals_)); + ASSERT_OK(ScalarExprEvaluator::Open(build_expr_evals_, nullptr)); + + ScalarExpr* probe_expr = pool_.Add(new SlotRef(TYPE_INT, 0)); + ASSERT_OK(probe_expr->Init(desc, nullptr)); + probe_exprs_.push_back(probe_expr); + ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, &mem_pool_, + &probe_expr_evals_)); + ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr)); } virtual void TearDown() { - Expr::Close(build_expr_ctxs_, NULL); - Expr::Close(probe_expr_ctxs_, NULL); + ScalarExprEvaluator::Close(build_expr_evals_, nullptr); + ScalarExprEvaluator::Close(probe_expr_evals_, nullptr); + ScalarExpr::Close(build_exprs_); + ScalarExpr::Close(probe_exprs_); } TupleRow* CreateTupleRow(int32_t val) { @@ -95,10 +102,10 @@ class OldHashTableTest : public testing::Test { OldHashTable::Iterator iter = table->Begin(); while (iter != table->End()) { TupleRow* row = iter.GetRow(); - int32_t val = *reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(row)); + int32_t val = *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(row)); EXPECT_GE(val, min); EXPECT_LT(val, max); - if (all_unique) EXPECT_TRUE(results[val] == NULL); + if (all_unique) EXPECT_TRUE(results[val] == nullptr); EXPECT_EQ(row->GetTuple(0), expected[val]->GetTuple(0)); results[val] = row; iter.Next<false>(); @@ -110,9 +117,9 @@ class OldHashTableTest : public testing::Test { void ValidateMatch(TupleRow* probe_row, TupleRow* build_row) { EXPECT_TRUE(probe_row != build_row); int32_t build_val = - *reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(probe_row)); + *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(probe_row)); int32_t probe_val = - *reinterpret_cast<int32_t*>(probe_expr_ctxs_[0]->GetValue(build_row)); + *reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(build_row)); EXPECT_EQ(build_val, probe_val); } @@ -160,16 +167,16 @@ TEST_F(OldHashTableTest, SetupTest) { TupleRow* probe_row4 = CreateTupleRow(4); int32_t* val_row1 = - reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(build_row1)); + reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row1)); EXPECT_EQ(*val_row1, 1); int32_t* val_row2 = - reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(build_row2)); + reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row2)); EXPECT_EQ(*val_row2, 2); int32_t* val_row3 = - reinterpret_cast<int32_t*>(probe_expr_ctxs_[0]->GetValue(probe_row3)); + reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row3)); EXPECT_EQ(*val_row3, 3); int32_t* val_row4 = - reinterpret_cast<int32_t*>(probe_expr_ctxs_[0]->GetValue(probe_row4)); + reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row4)); EXPECT_EQ(*val_row4, 4); mem_pool_.FreeAll(); @@ -196,53 +203,56 @@ TEST_F(OldHashTableTest, BasicTest) { // Create the hash table and insert the build rows MemTracker tracker; - OldHashTable hash_table(NULL, build_expr_ctxs_, probe_expr_ctxs_, - vector<ExprContext*>(), 1, false, std::vector<bool>(build_expr_ctxs_.size(), false), - 0, &tracker, vector<RuntimeFilter*>()); + scoped_ptr<OldHashTable> hash_table; + EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_, + vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false), + 0, &tracker, vector<RuntimeFilter*>(), &hash_table)); + EXPECT_OK(hash_table->Open(nullptr)); for (int i = 0; i < 5; ++i) { - hash_table.Insert(build_rows[i]); + hash_table->Insert(build_rows[i]); } - EXPECT_EQ(hash_table.size(), 5); + EXPECT_EQ(hash_table->size(), 5); // Do a full table scan and validate returned pointers - FullScan(&hash_table, 0, 5, true, scan_rows, build_rows); - ProbeTest(&hash_table, probe_rows, 10, false); + FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows); + ProbeTest(hash_table.get(), probe_rows, 10, false); // Resize and scan again - ResizeTable(&hash_table, 64); - EXPECT_EQ(hash_table.num_buckets(), 64); - EXPECT_EQ(hash_table.size(), 5); + ResizeTable(hash_table.get(), 64); + EXPECT_EQ(hash_table->num_buckets(), 64); + EXPECT_EQ(hash_table->size(), 5); memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(&hash_table, 0, 5, true, scan_rows, build_rows); - ProbeTest(&hash_table, probe_rows, 10, false); + FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows); + ProbeTest(hash_table.get(), probe_rows, 10, false); // Resize to two and cause some collisions - ResizeTable(&hash_table, 2); - EXPECT_EQ(hash_table.num_buckets(), 2); - EXPECT_EQ(hash_table.size(), 5); + ResizeTable(hash_table.get(), 2); + EXPECT_EQ(hash_table->num_buckets(), 2); + EXPECT_EQ(hash_table->size(), 5); memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(&hash_table, 0, 5, true, scan_rows, build_rows); - ProbeTest(&hash_table, probe_rows, 10, false); + FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows); + ProbeTest(hash_table.get(), probe_rows, 10, false); // Resize to one and turn it into a linked list - ResizeTable(&hash_table, 1); - EXPECT_EQ(hash_table.num_buckets(), 1); - EXPECT_EQ(hash_table.size(), 5); + ResizeTable(hash_table.get(), 1); + EXPECT_EQ(hash_table->num_buckets(), 1); + EXPECT_EQ(hash_table->size(), 5); memset(scan_rows, 0, sizeof(scan_rows)); - FullScan(&hash_table, 0, 5, true, scan_rows, build_rows); - ProbeTest(&hash_table, probe_rows, 10, false); + FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows); + ProbeTest(hash_table.get(), probe_rows, 10, false); - hash_table.Close(); + hash_table->Close(nullptr); mem_pool_.FreeAll(); } // This tests makes sure we can scan ranges of buckets TEST_F(OldHashTableTest, ScanTest) { MemTracker tracker; - OldHashTable hash_table(NULL, build_expr_ctxs_, probe_expr_ctxs_, - vector<ExprContext*>(), 1, false, - std::vector<bool>(build_expr_ctxs_.size(), false), 0, &tracker, - vector<RuntimeFilter*>()); + scoped_ptr<OldHashTable> hash_table; + EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_, + vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false), + 0, &tracker, vector<RuntimeFilter*>(), &hash_table)); + EXPECT_OK(hash_table->Open(nullptr)); // Add 1 row with val 1, 2 with val 2, etc vector<TupleRow*> build_rows; ProbeTestData probe_rows[15]; @@ -251,7 +261,7 @@ TEST_F(OldHashTableTest, ScanTest) { probe_rows[val].probe_row = CreateTupleRow(val); for (int i = 0; i < val; ++i) { TupleRow* row = CreateTupleRow(val); - hash_table.Insert(row); + hash_table->Insert(row); build_rows.push_back(row); probe_rows[val].expected_build_rows.push_back(row); } @@ -263,22 +273,22 @@ TEST_F(OldHashTableTest, ScanTest) { } // Test that all the builds were found - ProbeTest(&hash_table, probe_rows, 15, true); + ProbeTest(hash_table.get(), probe_rows, 15, true); // Resize and try again - ResizeTable(&hash_table, 128); - EXPECT_EQ(hash_table.num_buckets(), 128); - ProbeTest(&hash_table, probe_rows, 15, true); + ResizeTable(hash_table.get(), 128); + EXPECT_EQ(hash_table->num_buckets(), 128); + ProbeTest(hash_table.get(), probe_rows, 15, true); - ResizeTable(&hash_table, 16); - EXPECT_EQ(hash_table.num_buckets(), 16); - ProbeTest(&hash_table, probe_rows, 15, true); + ResizeTable(hash_table.get(), 16); + EXPECT_EQ(hash_table->num_buckets(), 16); + ProbeTest(hash_table.get(), probe_rows, 15, true); - ResizeTable(&hash_table, 2); - EXPECT_EQ(hash_table.num_buckets(), 2); - ProbeTest(&hash_table, probe_rows, 15, true); + ResizeTable(hash_table.get(), 2); + EXPECT_EQ(hash_table->num_buckets(), 2); + ProbeTest(hash_table.get(), probe_rows, 15, true); - hash_table.Close(); + hash_table->Close(nullptr); mem_pool_.FreeAll(); } @@ -287,37 +297,38 @@ TEST_F(OldHashTableTest, GrowTableTest) { int num_to_add = 4; int expected_size = 0; MemTracker tracker(100 * 1024 * 1024); - OldHashTable hash_table(NULL, build_expr_ctxs_, probe_expr_ctxs_, - vector<ExprContext*>(), 1, false, - std::vector<bool>(build_expr_ctxs_.size(), false), 0, &tracker, - vector<RuntimeFilter*>(), false, num_to_add); - EXPECT_FALSE(hash_table.mem_limit_exceeded()); + scoped_ptr<OldHashTable> hash_table; + EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_, + vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false), + 0, &tracker, vector<RuntimeFilter*>(), &hash_table, false, num_to_add)); + EXPECT_OK(hash_table->Open(nullptr)); + EXPECT_FALSE(hash_table->mem_limit_exceeded()); EXPECT_TRUE(!tracker.LimitExceeded()); // This inserts about 5M entries int build_row_val = 0; for (int i = 0; i < 20; ++i) { for (int j = 0; j < num_to_add; ++build_row_val, ++j) { - hash_table.Insert(CreateTupleRow(build_row_val)); + hash_table->Insert(CreateTupleRow(build_row_val)); } expected_size += num_to_add; num_to_add *= 2; } - EXPECT_TRUE(hash_table.mem_limit_exceeded()); + EXPECT_TRUE(hash_table->mem_limit_exceeded()); EXPECT_TRUE(tracker.LimitExceeded()); // Validate that we can find the entries before we went over the limit for (int i = 0; i < expected_size * 5; i += 100000) { TupleRow* probe_row = CreateTupleRow(i); - OldHashTable::Iterator iter = hash_table.Find(probe_row); - if (i < hash_table.size()) { - EXPECT_TRUE(iter != hash_table.End()); + OldHashTable::Iterator iter = hash_table->Find(probe_row); + if (i < hash_table->size()) { + EXPECT_TRUE(iter != hash_table->End()); ValidateMatch(probe_row, iter.GetRow()); } else { - EXPECT_TRUE(iter == hash_table.End()); + EXPECT_TRUE(iter == hash_table->End()); } } - hash_table.Close(); + hash_table->Close(nullptr); mem_pool_.FreeAll(); }
