http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 8de7c63..a94359b 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -31,7 +31,7 @@ #include "exec/parquet-column-readers.h" #include "exec/parquet-column-stats.h" #include "exec/scanner-context.inline.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" #include "runtime/collection-value-builder.h" #include "runtime/descriptors.h" #include "runtime/runtime-state.h" @@ -191,8 +191,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { num_dict_filtered_row_groups_counter_ = ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", TUnit::UNIT); process_footer_timer_stats_ = - ADD_SUMMARY_STATS_TIMER( - scan_node_->runtime_profile(), "FooterProcessingTime"); + ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), "FooterProcessingTime"); codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>( scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET)); @@ -209,15 +208,16 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { if (min_max_tuple_desc != nullptr) { int64_t tuple_size = min_max_tuple_desc->byte_size(); if (!min_max_tuple_buffer_.TryAllocate(tuple_size)) { - return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet " - "statistics tuple for file '$1'.", tuple_size, filename())); + string details = Substitute("Could not allocate buffer of $0 bytes for Parquet " + "statistics tuple for file '$1'.", tuple_size, filename()); + return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, tuple_size); } } // Clone the min/max statistics conjuncts. - RETURN_IF_ERROR(Expr::CloneIfNotExists(scan_node_->min_max_conjunct_ctxs(), - state_, &min_max_conjuncts_ctxs_)); - min_max_conjuncts_ctxs_to_eval_.reserve(min_max_conjuncts_ctxs_.size()); + RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_, + expr_mem_pool_.get(), scan_node_->min_max_conjunct_evals(), + &min_max_conjunct_evals_)); for (int i = 0; i < context->filter_ctxs().size(); ++i) { const FilterContext* ctx = &context->filter_ctxs()[i]; @@ -318,7 +318,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) { if (schema_resolver_.get() != nullptr) schema_resolver_.reset(); - Expr::Close(min_max_conjuncts_ctxs_, state_); + ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_); for (int i = 0; i < filter_ctxs_.size(); ++i) { const FilterStats* stats = filter_ctxs_[i]->stats; @@ -505,12 +505,10 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts( Tuple* min_max_tuple = reinterpret_cast<Tuple*>(min_max_tuple_buffer_.buffer()); min_max_tuple->Init(tuple_size); - DCHECK_EQ(min_max_tuple_desc->slots().size(), min_max_conjuncts_ctxs_.size()); - - min_max_conjuncts_ctxs_to_eval_.clear(); - for (int i = 0; i < min_max_conjuncts_ctxs_.size(); ++i) { + DCHECK_EQ(min_max_tuple_desc->slots().size(), min_max_conjunct_evals_.size()); + for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) { SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i]; - ExprContext* conjunct = min_max_conjuncts_ctxs_[i]; + ScalarExprEvaluator* eval = min_max_conjunct_evals_[i]; // Resolve column path to determine col idx. SchemaNode* node = nullptr; @@ -548,7 +546,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts( const ColumnType& col_type = slot_desc->type(); bool stats_read = false; void* slot = min_max_tuple->GetSlot(slot_desc->tuple_offset()); - const string& fn_name = conjunct->root()->function_name(); + const string& fn_name = eval->root().function_name(); if (fn_name == "lt" || fn_name == "le") { // We need to get min stats. stats_read = ColumnStatsBase::ReadFromThrift( @@ -561,18 +559,15 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts( DCHECK(false) << "Unsupported function name for statistics evaluation: " << fn_name; } - if (stats_read) min_max_conjuncts_ctxs_to_eval_.push_back(conjunct); - } - - if (!min_max_conjuncts_ctxs_to_eval_.empty()) { - TupleRow row; - row.SetTuple(0, min_max_tuple); - if (!ExecNode::EvalConjuncts(&min_max_conjuncts_ctxs_to_eval_[0], - min_max_conjuncts_ctxs_to_eval_.size(), &row)) { - *skip_row_group = true; + if (stats_read) { + TupleRow row; + row.SetTuple(0, min_max_tuple); + if (!ExecNode::EvalPredicate(eval, &row)) { + *skip_row_group = true; + return Status::OK(); + } } } - return Status::OK(); } @@ -724,8 +719,8 @@ bool HdfsParquetScanner::IsDictFilterable(ParquetColumnReader* col_reader) { // rather than materializing the values. if (!slot_desc) return false; // Does this column reader have any dictionary filter conjuncts? - auto dict_filter_it = scanner_dict_filter_map_.find(slot_desc->id()); - if (dict_filter_it == scanner_dict_filter_map_.end()) return false; + auto dict_filter_it = dict_filter_map_.find(slot_desc->id()); + if (dict_filter_it == dict_filter_map_.end()) return false; // Certain datatypes (chars, timestamps) do not have the appropriate value in the // file format and must be converted before return. This is true for the @@ -875,9 +870,10 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr dictionary->num_entries() >= LEGACY_IMPALA_MAX_DICT_ENTRIES) continue; const SlotDescriptor* slot_desc = scalar_reader->slot_desc(); - auto dict_filter_it = scanner_dict_filter_map_.find(slot_desc->id()); - DCHECK(dict_filter_it != scanner_dict_filter_map_.end()); - vector<ExprContext*>& dict_filter_conjunct_ctxs = dict_filter_it->second; + auto dict_filter_it = dict_filter_map_.find(slot_desc->id()); + DCHECK(dict_filter_it != dict_filter_map_.end()); + const vector<ScalarExprEvaluator*>& dict_filter_conjunct_evals = + dict_filter_it->second; void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset()); bool column_has_match = false; for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) { @@ -887,8 +883,8 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr // If any dictionary value passes the conjuncts, then move on to the next column. TupleRow row; row.SetTuple(0, dict_filter_tuple); - if (ExecNode::EvalConjuncts(dict_filter_conjunct_ctxs.data(), - dict_filter_conjunct_ctxs.size(), &row)) { + if (ExecNode::EvalConjuncts(dict_filter_conjunct_evals.data(), + dict_filter_conjunct_evals.size(), &row)) { column_has_match = true; break; } @@ -998,8 +994,8 @@ Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) { // with parse_status_. RETURN_IF_ERROR(state_->GetQueryStatus()); // Free local expr allocations for this thread - for (const auto& kv: scanner_conjuncts_map_) { - ExprContext::FreeLocalAllocations(kv.second); + for (const auto& kv: conjunct_evals_map_) { + ScalarExprEvaluator::FreeLocalAllocations(kv.second); } return Status::OK(); } @@ -1018,7 +1014,7 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) { // output batch per remaining scratch tuple and return. No need to evaluate // filters/conjuncts. DCHECK(filter_ctxs_.empty()); - DCHECK(scanner_conjunct_ctxs_->empty()); + DCHECK(conjunct_evals_->empty()); int num_tuples = min(dst_batch->capacity() - dst_batch->num_rows(), scratch_batch_->num_tuples - scratch_batch_->tuple_idx); memset(output_row, 0, num_tuples * sizeof(Tuple*)); @@ -1040,8 +1036,7 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) { } Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node, - const vector<ExprContext*>& conjunct_ctxs, const vector<FilterContext>& filter_ctxs, - Function** process_scratch_batch_fn) { + const vector<ScalarExpr*>& conjuncts, Function** process_scratch_batch_fn) { DCHECK(node->runtime_state()->ShouldCodegen()); *process_scratch_batch_fn = NULL; LlvmCodeGen* codegen = node->runtime_state()->codegen(); @@ -1052,8 +1047,7 @@ Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node, DCHECK(fn != NULL); Function* eval_conjuncts_fn; - RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjunct_ctxs, - &eval_conjuncts_fn)); + RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, &eval_conjuncts_fn)); DCHECK(eval_conjuncts_fn != NULL); int replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts"); @@ -1061,7 +1055,7 @@ Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node, Function* eval_runtime_filters_fn; RETURN_IF_ERROR(CodegenEvalRuntimeFilters( - codegen, filter_ctxs, &eval_runtime_filters_fn)); + codegen, node->filter_exprs(), &eval_runtime_filters_fn)); DCHECK(eval_runtime_filters_fn != NULL); replaced = codegen->ReplaceCallSites(fn, eval_runtime_filters_fn, "EvalRuntimeFilters"); @@ -1101,11 +1095,11 @@ bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) { // EvalRuntimeFilter() is the same as the cross-compiled version except EvalOneFilter() // is replaced with the one generated by CodegenEvalOneFilter(). Status HdfsParquetScanner::CodegenEvalRuntimeFilters(LlvmCodeGen* codegen, - const vector<FilterContext>& filter_ctxs, Function** fn) { + const vector<ScalarExpr*>& filter_exprs, Function** fn) { LLVMContext& context = codegen->context(); LlvmBuilder builder(context); - *fn = NULL; + *fn = nullptr; Type* this_type = codegen->GetPtrType(HdfsParquetScanner::LLVM_CLASS_NAME); PointerType* tuple_row_ptr_type = codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME); LlvmCodeGen::FnPrototype prototype(codegen, "EvalRuntimeFilters", @@ -1118,7 +1112,7 @@ Status HdfsParquetScanner::CodegenEvalRuntimeFilters(LlvmCodeGen* codegen, Value* this_arg = args[0]; Value* row_arg = args[1]; - int num_filters = filter_ctxs.size(); + int num_filters = filter_exprs.size(); if (num_filters == 0) { builder.CreateRet(codegen->true_value()); } else { @@ -1130,13 +1124,15 @@ Status HdfsParquetScanner::CodegenEvalRuntimeFilters(LlvmCodeGen* codegen, for (int i = 0; i < num_filters; ++i) { Function* eval_runtime_filter_fn = codegen->GetFunction(IRFunction::PARQUET_SCANNER_EVAL_RUNTIME_FILTER, true); - DCHECK(eval_runtime_filter_fn != NULL); + DCHECK(eval_runtime_filter_fn != nullptr); // Codegen function for inlining filter's expression evaluation and constant fold // the type of the expression into the hashing function to avoid branches. Function* eval_one_filter_fn; - RETURN_IF_ERROR(filter_ctxs[i].CodegenEval(codegen, &eval_one_filter_fn)); - DCHECK(eval_one_filter_fn != NULL); + DCHECK(filter_exprs[i] != nullptr); + RETURN_IF_ERROR(FilterContext::CodegenEval(codegen, filter_exprs[i], + &eval_one_filter_fn)); + DCHECK(eval_one_filter_fn != nullptr); int replaced = codegen->ReplaceCallSites(eval_runtime_filter_fn, eval_one_filter_fn, "FilterContext4Eval"); @@ -1163,7 +1159,7 @@ Status HdfsParquetScanner::CodegenEvalRuntimeFilters(LlvmCodeGen* codegen, } *fn = codegen->FinalizeFunction(eval_runtime_filters_fn); - if (*fn == NULL) { + if (*fn == nullptr) { return Status("Codegen'd HdfsParquetScanner::EvalRuntimeFilters() failed " "verification, see log"); } @@ -1179,7 +1175,8 @@ bool HdfsParquetScanner::AssembleCollection( const TupleDescriptor* tuple_desc = &coll_value_builder->tuple_desc(); Tuple* template_tuple = template_tuple_map_[tuple_desc]; - const vector<ExprContext*> conjunct_ctxs = scanner_conjuncts_map_[tuple_desc->id()]; + const vector<ScalarExprEvaluator*> evals = + conjunct_evals_map_[tuple_desc->id()]; int64_t rows_read = 0; bool continue_execution = !scan_node_->ReachedLimit() && !context_->cancelled(); @@ -1227,7 +1224,7 @@ bool HdfsParquetScanner::AssembleCollection( end_of_collection = column_readers[0]->rep_level() <= new_collection_rep_level; if (materialize_tuple) { - if (ExecNode::EvalConjuncts(&conjunct_ctxs[0], conjunct_ctxs.size(), row)) { + if (ExecNode::EvalConjuncts(&evals[0], evals.size(), row)) { tuple = next_tuple(tuple_desc->byte_size(), tuple); ++num_to_commit; } @@ -1345,8 +1342,9 @@ Status HdfsParquetScanner::ProcessFooter() { } if (!metadata_buffer.TryAllocate(metadata_size)) { - return Status(Substitute("Could not allocate buffer of $0 bytes for Parquet " - "metadata for file '$1'.", metadata_size, filename())); + string details = Substitute("Could not allocate buffer of $0 bytes for Parquet " + "metadata for file '$1'.", metadata_size, filename()); + return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, metadata_size); } metadata_ptr = metadata_buffer.buffer(); DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index 59a06bc..ad86dc8 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -337,8 +337,7 @@ class HdfsParquetScanner : public HdfsScanner { /// Codegen ProcessScratchBatch(). Stores the resulting function in /// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise. static Status Codegen(HdfsScanNodeBase* node, - const std::vector<ExprContext*>& conjunct_ctxs, - const std::vector<FilterContext>& filter_ctxs, + const std::vector<ScalarExpr*>& conjuncts, llvm::Function** process_scratch_batch_fn); /// The repetition level is set to this value to indicate the end of a row group. @@ -379,13 +378,9 @@ class HdfsParquetScanner : public HdfsScanner { /// Buffer to back tuples when reading parquet::Statistics. ScopedBuffer min_max_tuple_buffer_; - /// Min/max statistics contexts, owned by HdfsScanner::state_->obj_pool_. - vector<ExprContext*> min_max_conjuncts_ctxs_; - - /// Used in EvaluateRowGroupStats() to store non-owning copies of conjunct pointers from - /// 'min_max_conjunct_ctxs_'. It is declared here to avoid the dynamic allocation - /// overhead. - vector<ExprContext*> min_max_conjuncts_ctxs_to_eval_; + /// Clone of Min/max statistics conjunct evaluators. Has the same life time as + /// the scanner. Stored in 'obj_pool_'. + vector<ScalarExprEvaluator*> min_max_conjunct_evals_; /// Cached runtime filter contexts, one for each filter that applies to this column, /// owned by instances of this class. @@ -544,7 +539,7 @@ class HdfsParquetScanner : public HdfsScanner { /// 'filter_ctxs'. Return error status on failure. The generated function is returned /// via 'fn'. static Status CodegenEvalRuntimeFilters(LlvmCodeGen* codegen, - const std::vector<FilterContext>& filter_ctxs, llvm::Function** fn); + const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn); /// Reads data using 'column_readers' to materialize the tuples of a CollectionValue /// allocated from 'coll_value_builder'. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index a3aa4d3..cc62ef8 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -20,8 +20,8 @@ #include "common/version.h" #include "exec/hdfs-table-sink.h" #include "exec/parquet-column-stats.inline.h" -#include "exprs/expr-context.h" -#include "exprs/expr.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "rpc/thrift-util.h" #include "runtime/decimal-value.h" #include "runtime/mem-tracker.h" @@ -88,10 +88,10 @@ namespace impala { class HdfsParquetTableWriter::BaseColumnWriter { public: // expr - the expression to generate output values for this column. - BaseColumnWriter(HdfsParquetTableWriter* parent, ExprContext* expr_ctx, + BaseColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* expr_eval, const THdfsCompression::type& codec) : parent_(parent), - expr_ctx_(expr_ctx), + expr_eval_(expr_eval), codec_(codec), page_size_(DEFAULT_DATA_PAGE_SIZE), current_page_(nullptr), @@ -172,7 +172,7 @@ class HdfsParquetTableWriter::BaseColumnWriter { if (dict_encoder_base_ != nullptr) dict_encoder_base_->ClearIndices(); } - const ColumnType& type() const { return expr_ctx_->root()->type(); } + const ColumnType& type() const { return expr_eval_->root().type(); } uint64_t num_values() const { return num_values_; } uint64_t total_compressed_size() const { return total_compressed_byte_size_; } uint64_t total_uncompressed_size() const { return total_uncompressed_byte_size_; } @@ -222,7 +222,7 @@ class HdfsParquetTableWriter::BaseColumnWriter { }; HdfsParquetTableWriter* parent_; - ExprContext* expr_ctx_; + ScalarExprEvaluator* expr_eval_; THdfsCompression::type codec_; @@ -285,13 +285,13 @@ template<typename T> class HdfsParquetTableWriter::ColumnWriter : public HdfsParquetTableWriter::BaseColumnWriter { public: - ColumnWriter(HdfsParquetTableWriter* parent, ExprContext* ctx, + ColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval, const THdfsCompression::type& codec) - : BaseColumnWriter(parent, ctx, codec), + : BaseColumnWriter(parent, eval, codec), num_values_since_dict_size_check_(0), plain_encoded_value_size_( - ParquetPlainEncoder::EncodedByteSize(ctx->root()->type())) { - DCHECK_NE(ctx->root()->type().type, TYPE_BOOLEAN); + ParquetPlainEncoder::EncodedByteSize(eval->root().type())) { + DCHECK_NE(eval->root().type().type, TYPE_BOOLEAN); } virtual void Reset() { @@ -398,12 +398,12 @@ inline StringValue* HdfsParquetTableWriter::ColumnWriter<StringValue>::CastValue class HdfsParquetTableWriter::BoolColumnWriter : public HdfsParquetTableWriter::BaseColumnWriter { public: - BoolColumnWriter(HdfsParquetTableWriter* parent, ExprContext* ctx, + BoolColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval, const THdfsCompression::type& codec) - : BaseColumnWriter(parent, ctx, codec), + : BaseColumnWriter(parent, eval, codec), page_stats_(parent_->reusable_col_mem_pool_.get(), -1), row_group_stats_(parent_->reusable_col_mem_pool_.get(), -1) { - DCHECK_EQ(ctx->root()->type().type, TYPE_BOOLEAN); + DCHECK_EQ(eval->root().type().type, TYPE_BOOLEAN); bool_values_ = parent_->state_->obj_pool()->Add( new BitWriter(values_buffer_, values_buffer_len_)); // Dictionary encoding doesn't make sense for bools and is not allowed by @@ -449,7 +449,7 @@ class HdfsParquetTableWriter::BoolColumnWriter : inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row) { ++num_values_; - void* value = expr_ctx_->GetValue(row); + void* value = expr_eval_->GetValue(row); if (current_page_ == nullptr) NewPage(); // Ensure that we have enough space for the definition level, but don't write it yet in @@ -731,8 +731,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() { HdfsParquetTableWriter::HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state, OutputPartition* output, const HdfsPartitionDescriptor* part_desc, - const HdfsTableDescriptor* table_desc, const vector<ExprContext*>& output_expr_ctxs) - : HdfsTableWriter(parent, state, output, part_desc, table_desc, output_expr_ctxs), + const HdfsTableDescriptor* table_desc) + : HdfsTableWriter(parent, state, output, part_desc, table_desc), thrift_serializer_(new ThriftSerializer(true)), current_row_group_(nullptr), row_count_(0), @@ -774,59 +774,51 @@ Status HdfsParquetTableWriter::Init() { // Initialize each column structure. for (int i = 0; i < columns_.size(); ++i) { BaseColumnWriter* writer = nullptr; - const ColumnType& type = output_expr_ctxs_[i]->root()->type(); + const ColumnType& type = output_expr_evals_[i]->root().type(); switch (type.type) { case TYPE_BOOLEAN: - writer = new BoolColumnWriter( - this, output_expr_ctxs_[i], codec); + writer = new BoolColumnWriter(this, output_expr_evals_[i], codec); break; case TYPE_TINYINT: - writer = new ColumnWriter<int8_t>( - this, output_expr_ctxs_[i], codec); + writer = new ColumnWriter<int8_t>(this, output_expr_evals_[i], codec); break; case TYPE_SMALLINT: - writer = new ColumnWriter<int16_t>( - this, output_expr_ctxs_[i], codec); + writer = new ColumnWriter<int16_t>(this, output_expr_evals_[i], codec); break; case TYPE_INT: - writer = new ColumnWriter<int32_t>( - this, output_expr_ctxs_[i], codec); + writer = new ColumnWriter<int32_t>(this, output_expr_evals_[i], codec); break; case TYPE_BIGINT: - writer = new ColumnWriter<int64_t>( - this, output_expr_ctxs_[i], codec); + writer = new ColumnWriter<int64_t>(this, output_expr_evals_[i], codec); break; case TYPE_FLOAT: - writer = new ColumnWriter<float>( - this, output_expr_ctxs_[i], codec); + writer = new ColumnWriter<float>(this, output_expr_evals_[i], codec); break; case TYPE_DOUBLE: - writer = new ColumnWriter<double>( - this, output_expr_ctxs_[i], codec); + writer = new ColumnWriter<double>(this, output_expr_evals_[i], codec); break; case TYPE_TIMESTAMP: writer = new ColumnWriter<TimestampValue>( - this, output_expr_ctxs_[i], codec); + this, output_expr_evals_[i], codec); break; case TYPE_VARCHAR: case TYPE_STRING: case TYPE_CHAR: - writer = new ColumnWriter<StringValue>( - this, output_expr_ctxs_[i], codec); + writer = new ColumnWriter<StringValue>(this, output_expr_evals_[i], codec); break; case TYPE_DECIMAL: - switch (output_expr_ctxs_[i]->root()->type().GetByteSize()) { + switch (output_expr_evals_[i]->root().type().GetByteSize()) { case 4: writer = new ColumnWriter<Decimal4Value>( - this, output_expr_ctxs_[i], codec); + this, output_expr_evals_[i], codec); break; case 8: writer = new ColumnWriter<Decimal8Value>( - this, output_expr_ctxs_[i], codec); + this, output_expr_evals_[i], codec); break; case 16: writer = new ColumnWriter<Decimal16Value>( - this, output_expr_ctxs_[i], codec); + this, output_expr_evals_[i], codec); break; default: DCHECK(false); @@ -852,10 +844,10 @@ Status HdfsParquetTableWriter::CreateSchema() { for (int i = 0; i < columns_.size(); ++i) { parquet::SchemaElement& node = file_metadata_.schema[i + 1]; + const ColumnType& type = output_expr_evals_[i]->root().type(); node.name = table_desc_->col_descs()[i + num_clustering_cols].name(); - node.__set_type(IMPALA_TO_PARQUET_TYPES[output_expr_ctxs_[i]->root()->type().type]); + node.__set_type(IMPALA_TO_PARQUET_TYPES[type.type]); node.__set_repetition_type(FieldRepetitionType::OPTIONAL); - const ColumnType& type = output_expr_ctxs_[i]->root()->type(); if (type.type == TYPE_DECIMAL) { // This column is type decimal. Update the file metadata to include the // additional fields: @@ -864,9 +856,9 @@ Status HdfsParquetTableWriter::CreateSchema() { // 3) precision/scale node.__set_converted_type(ConvertedType::DECIMAL); node.__set_type_length( - ParquetPlainEncoder::DecimalSize(output_expr_ctxs_[i]->root()->type())); - node.__set_scale(output_expr_ctxs_[i]->root()->type().scale); - node.__set_precision(output_expr_ctxs_[i]->root()->type().precision); + ParquetPlainEncoder::DecimalSize(output_expr_evals_[i]->root().type())); + node.__set_scale(output_expr_evals_[i]->root().type().scale); + node.__set_precision(output_expr_evals_[i]->root().type().precision); } else if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR || (type.type == TYPE_STRING && state_->query_options().parquet_annotate_strings_utf8)) { @@ -887,7 +879,7 @@ Status HdfsParquetTableWriter::AddRowGroup() { current_row_group_->columns.resize(columns_.size()); for (int i = 0; i < columns_.size(); ++i) { ColumnMetaData metadata; - metadata.type = IMPALA_TO_PARQUET_TYPES[columns_[i]->expr_ctx_->root()->type().type]; + metadata.type = IMPALA_TO_PARQUET_TYPES[columns_[i]->type().type]; metadata.path_in_schema.push_back( table_desc_->col_descs()[i + num_clustering_cols].name()); metadata.codec = columns_[i]->codec(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-parquet-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h index 8cbd456..b3d319e 100644 --- a/be/src/exec/hdfs-parquet-table-writer.h +++ b/be/src/exec/hdfs-parquet-table-writer.h @@ -51,11 +51,9 @@ class TupleRow; class HdfsParquetTableWriter : public HdfsTableWriter { public: - HdfsParquetTableWriter(HdfsTableSink* parent, - RuntimeState* state, OutputPartition* output_partition, - const HdfsPartitionDescriptor* part_desc, - const HdfsTableDescriptor* table_desc, - const std::vector<ExprContext*>& output_expr_ctxs); + HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state, + OutputPartition* output_partition, const HdfsPartitionDescriptor* part_desc, + const HdfsTableDescriptor* table_desc); ~HdfsParquetTableWriter(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-rcfile-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc index eb78fb9..33b62dd 100644 --- a/be/src/exec/hdfs-rcfile-scanner.cc +++ b/be/src/exec/hdfs-rcfile-scanner.cc @@ -23,7 +23,6 @@ #include "exec/hdfs-sequence-scanner.h" #include "exec/scanner-context.inline.h" #include "exec/text-converter.inline.h" -#include "exprs/expr.h" #include "runtime/descriptors.h" #include "runtime/runtime-state.h" #include "runtime/mem-pool.h" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 0e7c319..eeb7cfe 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -33,7 +33,8 @@ #include "codegen/llvm-codegen.h" #include "common/logging.h" #include "common/object-pool.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/descriptors.h" #include "runtime/hdfs-fs-cache.h" #include "runtime/runtime-filter.inline.h" @@ -80,14 +81,18 @@ const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC = const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024; HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs) : ScanNode(pool, tnode, descs), min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ? tnode.hdfs_scan_node.min_max_tuple_id : -1), skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ? tnode.hdfs_scan_node.skip_header_line_count : 0), tuple_id_(tnode.hdfs_scan_node.tuple_id), - disks_accessed_bitmap_(TUnit::UNIT, 0){ + tuple_desc_(descs.GetTupleDescriptor(tuple_id_)), + thrift_dict_filter_conjuncts_map_( + tnode.hdfs_scan_node.__isset.dictionary_filter_conjuncts ? + &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr), + disks_accessed_bitmap_(TUnit::UNIT, 0) { } HdfsScanNodeBase::~HdfsScanNodeBase() { @@ -98,16 +103,21 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) { // Add collection item conjuncts for (const auto& entry: tnode.hdfs_scan_node.collection_conjuncts) { - DCHECK(conjuncts_map_[entry.first].empty()); - RETURN_IF_ERROR( - Expr::CreateExprTrees(pool_, entry.second, &conjuncts_map_[entry.first])); + TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(entry.first); + RowDescriptor* collection_row_desc = state->obj_pool()->Add( + new RowDescriptor(tuple_desc, /* is_nullable */ false)); + DCHECK(conjuncts_map_.find(entry.first) == conjuncts_map_.end()); + RETURN_IF_ERROR(ScalarExpr::Create(entry.second, *collection_row_desc, state, + &conjuncts_map_[entry.first])); } + DCHECK(conjuncts_map_[tuple_id_].empty()); + conjuncts_map_[tuple_id_] = conjuncts_; const TQueryOptions& query_options = state->query_options(); - for (const TRuntimeFilterDesc& filter: tnode.runtime_filters) { - auto it = filter.planid_to_target_ndx.find(tnode.node_id); - DCHECK(it != filter.planid_to_target_ndx.end()); - const TRuntimeFilterTargetDesc& target = filter.targets[it->second]; + for (const TRuntimeFilterDesc& filter_desc : tnode.runtime_filters) { + auto it = filter_desc.planid_to_target_ndx.find(tnode.node_id); + DCHECK(it != filter_desc.planid_to_target_ndx.end()); + const TRuntimeFilterTargetDesc& target = filter_desc.targets[it->second]; if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::LOCAL && !target.is_local_target) { continue; @@ -116,40 +126,32 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) { !target.is_bound_by_partition_columns) { continue; } - - FilterContext filter_ctx; + ScalarExpr* filter_expr; RETURN_IF_ERROR( - Expr::CreateExprTree(pool_, target.target_expr, &filter_ctx.expr_ctx)); - filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, false); - - string filter_profile_title = Substitute("Filter $0 ($1)", filter.filter_id, + ScalarExpr::Create(target.target_expr, row_desc(), state, &filter_expr)); + filter_exprs_.push_back(filter_expr); + + // TODO: Move this to Prepare() + filter_ctxs_.emplace_back(); + FilterContext& filter_ctx = filter_ctxs_.back(); + filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, false); + string filter_profile_title = Substitute("Filter $0 ($1)", filter_desc.filter_id, PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES)); RuntimeProfile* profile = state->obj_pool()->Add( new RuntimeProfile(state->obj_pool(), filter_profile_title)); runtime_profile_->AddChild(profile); filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile, target.is_bound_by_partition_columns)); - - filter_ctxs_.push_back(filter_ctx); } - // Add row batch conjuncts - DCHECK(conjuncts_map_[tuple_id_].empty()); - conjuncts_map_[tuple_id_] = conjunct_ctxs_; - // Add min max conjuncts - RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, tnode.hdfs_scan_node.min_max_conjuncts, - &min_max_conjunct_ctxs_)); - DCHECK(min_max_conjunct_ctxs_.empty() == (min_max_tuple_id_ == -1)); - - for (const auto& entry: tnode.hdfs_scan_node.dictionary_filter_conjuncts) { - // Convert this slot's list of conjunct indices into a list of pointers - // into conjunct_ctxs_. - for (int conjunct_idx : entry.second) { - DCHECK_LT(conjunct_idx, conjunct_ctxs_.size()); - ExprContext* conjunct_ctx = conjunct_ctxs_[conjunct_idx]; - dict_filter_conjuncts_map_[entry.first].push_back(conjunct_ctx); - } + if (min_max_tuple_id_ != -1) { + min_max_tuple_desc_ = state->desc_tbl().GetTupleDescriptor(min_max_tuple_id_); + DCHECK(min_max_tuple_desc_ != nullptr); + RowDescriptor* min_max_row_desc = state->obj_pool()->Add( + new RowDescriptor(min_max_tuple_desc_, /* is_nullable */ false)); + RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts, + *min_max_row_desc, state, &min_max_conjuncts_)); } return Status::OK(); @@ -161,41 +163,37 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) { runtime_state_ = state; RETURN_IF_ERROR(ScanNode::Prepare(state)); - tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_); - DCHECK(tuple_desc_ != NULL); - // Prepare collection conjuncts for (const auto& entry: conjuncts_map_) { TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(entry.first); // conjuncts_ are already prepared in ExecNode::Prepare(), don't try to prepare again - if (tuple_desc == tuple_desc_) continue; - RowDescriptor* collection_row_desc = - state->obj_pool()->Add(new RowDescriptor(tuple_desc, /* is_nullable */ false)); - RETURN_IF_ERROR( - Expr::Prepare(entry.second, state, *collection_row_desc, expr_mem_tracker())); + if (tuple_desc == tuple_desc_) { + conjunct_evals_map_[entry.first] = conjunct_evals(); + } else { + DCHECK(conjunct_evals_map_[entry.first].empty()); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(entry.second, state, pool_, + expr_mem_pool(), &conjunct_evals_map_[entry.first])); + } + } + + DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size()); + for (int i = 0; i < filter_exprs_.size(); ++i) { + RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, pool_, + expr_mem_pool(), &filter_ctxs_[i].expr_eval)); + AddEvaluatorToFree(filter_ctxs_[i].expr_eval); } // Prepare min max statistics conjuncts. if (min_max_tuple_id_ != -1) { - min_max_tuple_desc_ = state->desc_tbl().GetTupleDescriptor(min_max_tuple_id_); - DCHECK(min_max_tuple_desc_ != NULL); - RowDescriptor* min_max_row_desc = - state->obj_pool()->Add(new RowDescriptor(min_max_tuple_desc_, /* is_nullable */ - false)); - RETURN_IF_ERROR(Expr::Prepare(min_max_conjunct_ctxs_, state, *min_max_row_desc, - expr_mem_tracker())); + RETURN_IF_ERROR(ScalarExprEvaluator::Create(min_max_conjuncts_, state, pool_, + expr_mem_pool(), &min_max_conjunct_evals_)); } - // One-time initialisation of state that is constant across scan ranges + // One-time initialization of state that is constant across scan ranges DCHECK(tuple_desc_->table_desc() != NULL); hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc()); scan_node_pool_.reset(new MemPool(mem_tracker())); - for (FilterContext& filter: filter_ctxs_) { - RETURN_IF_ERROR(filter.expr_ctx->Prepare(state, row_desc(), expr_mem_tracker())); - AddExprCtxToFree(filter.expr_ctx); - } - // Parse Avro table schema if applicable const string& avro_schema_str = hdfs_table_->avro_schema(); if (!avro_schema_str.empty()) { @@ -331,16 +329,16 @@ void HdfsScanNodeBase::Codegen(RuntimeState* state) { Status status; switch (format) { case THdfsFileFormat::TEXT: - status = HdfsTextScanner::Codegen(this, conjunct_ctxs_, &fn); + status = HdfsTextScanner::Codegen(this, conjuncts_, &fn); break; case THdfsFileFormat::SEQUENCE_FILE: - status = HdfsSequenceScanner::Codegen(this, conjunct_ctxs_, &fn); + status = HdfsSequenceScanner::Codegen(this, conjuncts_, &fn); break; case THdfsFileFormat::AVRO: - status = HdfsAvroScanner::Codegen(this, conjunct_ctxs_, &fn); + status = HdfsAvroScanner::Codegen(this, conjuncts_, &fn); break; case THdfsFileFormat::PARQUET: - status = HdfsParquetScanner::Codegen(this, conjunct_ctxs_, filter_ctxs_, &fn); + status = HdfsParquetScanner::Codegen(this, conjuncts_, &fn); break; default: // No codegen for this format @@ -363,16 +361,19 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::Open(state)); // Open collection conjuncts - for (const auto& entry: conjuncts_map_) { + for (auto& entry: conjunct_evals_map_) { // conjuncts_ are already opened in ExecNode::Open() if (entry.first == tuple_id_) continue; - RETURN_IF_ERROR(Expr::Open(entry.second, state)); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(entry.second, state)); } // Open min max conjuncts - RETURN_IF_ERROR(Expr::Open(min_max_conjunct_ctxs_, state)); + RETURN_IF_ERROR(ScalarExprEvaluator::Open(min_max_conjunct_evals_, state)); - for (FilterContext& filter: filter_ctxs_) RETURN_IF_ERROR(filter.expr_ctx->Open(state)); + // Open Runtime filter expressions. + for (FilterContext& ctx : filter_ctxs_) { + RETURN_IF_ERROR(ctx.expr_eval->Open(state)); + } // Create template tuples for all partitions. for (int64_t partition_id: partition_ids_) { @@ -381,7 +382,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { << " partition_id=" << partition_id << "\n" << PrintThrift(state->instance_ctx()); partition_template_tuple_map_[partition_id] = InitTemplateTuple( - partition_desc->partition_key_value_ctxs(), scan_node_pool_.get(), state); + partition_desc->partition_key_value_evals(), scan_node_pool_.get(), state); } runtime_state_->io_mgr()->RegisterContext(&reader_context_, mem_tracker()); @@ -476,15 +477,22 @@ void HdfsScanNodeBase::Close(RuntimeState* state) { if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll(); // Close collection conjuncts - for (const auto& tid_conjunct: conjuncts_map_) { + for (auto& tid_conjunct: conjuncts_map_) { // conjuncts_ are already closed in ExecNode::Close() if (tid_conjunct.first == tuple_id_) continue; - Expr::Close(tid_conjunct.second, state); + ScalarExprEvaluator::Close(conjunct_evals_map_[tid_conjunct.first], state); + ScalarExpr::Close(tid_conjunct.second); } - Expr::Close(min_max_conjunct_ctxs_, state); + // Close min max conjunct + ScalarExprEvaluator::Close(min_max_conjunct_evals_, state); + ScalarExpr::Close(min_max_conjuncts_); - for (auto& filter_ctx: filter_ctxs_) filter_ctx.expr_ctx->Close(state); + // Close filter + for (auto& filter_ctx : filter_ctxs_) { + if (filter_ctx.expr_eval != nullptr) filter_ctx.expr_eval->Close(state); + } + ScalarExpr::Close(filter_exprs_); ScanNode::Close(state); } @@ -529,8 +537,9 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) { return Status::OK(); } -bool HdfsScanNodeBase::FilePassesFilterPredicates(const vector<FilterContext>& filter_ctxs, - const THdfsFileFormat::type& format, HdfsFileDesc* file) { +bool HdfsScanNodeBase::FilePassesFilterPredicates( + const vector<FilterContext>& filter_ctxs, const THdfsFileFormat::type& format, + HdfsFileDesc* file) { #ifndef NDEBUG if (FLAGS_skip_file_runtime_filtering) return true; #endif @@ -689,15 +698,15 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition return status; } -Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ExprContext*>& value_ctxs, +Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ScalarExprEvaluator*>& evals, MemPool* pool, RuntimeState* state) const { if (partition_key_slots_.empty()) return NULL; Tuple* template_tuple = Tuple::Create(tuple_desc_->byte_size(), pool); for (int i = 0; i < partition_key_slots_.size(); ++i) { const SlotDescriptor* slot_desc = partition_key_slots_[i]; - ExprContext* value_ctx = value_ctxs[slot_desc->col_pos()]; + ScalarExprEvaluator* eval = evals[slot_desc->col_pos()]; // Exprs guaranteed to be literals, so can safely be evaluated without a row. - RawValue::Write(value_ctx->GetValue(NULL), template_tuple, slot_desc, NULL); + RawValue::Write(eval->GetValue(NULL), template_tuple, slot_desc, NULL); } return template_tuple; } @@ -775,7 +784,7 @@ void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type, } void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const { - const vector<ExprContext*>& conjuncts = ExecNode::conjunct_ctxs(); + const vector<ScalarExpr*>& conjuncts = ExecNode::conjuncts(); // Initialize all order to be conjuncts.size() (after the last conjunct) order->insert(order->begin(), materialized_slots().size(), conjuncts.size()); @@ -784,7 +793,7 @@ void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const vector<SlotId> slot_ids; for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) { slot_ids.clear(); - int num_slots = conjuncts[conjunct_idx]->root()->GetSlotIds(&slot_ids); + int num_slots = conjuncts[conjunct_idx]->GetSlotIds(&slot_ids); for (int j = 0; j < num_slots; ++j) { SlotDescriptor* slot_desc = desc_tbl.GetSlotDescriptor(slot_ids[j]); int slot_idx = GetMaterializedSlotIdx(slot_desc->col_path()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index abe0fc6..071f1b6 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -143,8 +143,8 @@ class HdfsScanNodeBase : public ScanNode { int min_max_tuple_id() const { return min_max_tuple_id_; } - const std::vector<ExprContext*> min_max_conjunct_ctxs() const { - return min_max_conjunct_ctxs_; + const std::vector<ScalarExprEvaluator*>& min_max_conjunct_evals() const { + return min_max_conjunct_evals_; } const TupleDescriptor* min_max_tuple_desc() const { return min_max_tuple_desc_; } @@ -155,13 +155,13 @@ class HdfsScanNodeBase : public ScanNode { int skip_header_line_count() const { return skip_header_line_count_; } DiskIoRequestContext* reader_context() { return reader_context_; } - typedef std::map<TupleId, std::vector<ExprContext*>> ConjunctsMap; - const ConjunctsMap& conjuncts_map() const { return conjuncts_map_; } + typedef std::map<TupleId, std::vector<ScalarExprEvaluator*>> ConjunctEvaluatorsMap; + const ConjunctEvaluatorsMap& conjuncts_map() const { return conjunct_evals_map_; } /// Slot Id => Dictionary Filter eligible conjuncts for that slot - typedef std::map<SlotId, std::vector<ExprContext*>> DictFilterConjunctsMap; - const DictFilterConjunctsMap& dict_filter_conjuncts_map() const { - return dict_filter_conjuncts_map_; + typedef std::map<TSlotId, std::vector<int32_t>> TDictFilterConjunctsMap; + const TDictFilterConjunctsMap* thrift_dict_filter_conjuncts_map() const { + return thrift_dict_filter_conjuncts_map_; } RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length() { @@ -227,7 +227,7 @@ class HdfsScanNodeBase : public ScanNode { /// Allocates and initializes a new template tuple allocated from pool with values /// from the partition columns for the current scan range, if any, /// Returns NULL if there are no partition keys slots. - Tuple* InitTemplateTuple(const std::vector<ExprContext*>& value_ctxs, + Tuple* InitTemplateTuple(const std::vector<ScalarExprEvaluator*>& value_evals, MemPool* pool, RuntimeState* state) const; /// Returns the file desc for 'filename'. Returns NULL if filename is invalid. @@ -293,6 +293,8 @@ class HdfsScanNodeBase : public ScanNode { bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name, const std::vector<FilterContext>& filter_ctxs); + const std::vector<ScalarExpr*>& filter_exprs() const { return filter_exprs_; } + const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; } protected: @@ -305,7 +307,8 @@ class HdfsScanNodeBase : public ScanNode { const int min_max_tuple_id_; /// Conjuncts to evaluate on parquet::Statistics. - vector<ExprContext*> min_max_conjunct_ctxs_; + vector<ScalarExpr*> min_max_conjuncts_; + vector<ScalarExprEvaluator*> min_max_conjunct_evals_; /// Descriptor for the tuple used to evaluate conjuncts on parquet::Statistics. TupleDescriptor* min_max_tuple_desc_ = nullptr; @@ -353,10 +356,12 @@ class HdfsScanNodeBase : public ScanNode { /// Conjuncts for each materialized tuple (top-level row batch tuples and collection /// item tuples). Includes a copy of ExecNode.conjuncts_. + typedef std::map<TupleId, std::vector<ScalarExpr*>> ConjunctsMap; ConjunctsMap conjuncts_map_; + ConjunctEvaluatorsMap conjunct_evals_map_; - /// Dictionary filtering eligible conjuncts for each slot - DictFilterConjunctsMap dict_filter_conjuncts_map_; + /// Dictionary filtering eligible conjuncts for each slot. + const TDictFilterConjunctsMap* thrift_dict_filter_conjuncts_map_; /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on /// the first call to GetNext(). The token manager, in a different thread, will read @@ -374,9 +379,12 @@ class HdfsScanNodeBase : public ScanNode { typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap; PathToSlotIdxMap path_to_materialized_slot_idx_; + /// Expressions to evaluate the input rows for filtering against runtime filters. + std::vector<ScalarExpr*> filter_exprs_; + /// List of contexts for expected runtime filters for this scan node. These contexts are /// cloned by individual scanners to be used in multi-threaded contexts, passed through - /// the per-scanner ScannerContext.. + /// the per-scanner ScannerContext. Correspond to exprs in 'filter_exprs_'. std::vector<FilterContext> filter_ctxs_; /// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise> http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 576ec55..4cbf503 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -362,12 +362,15 @@ void HdfsScanNode::ScannerThread() { SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics()); // Make thread-local copy of filter contexts to prune scan ranges, and to pass to the - // scanner for finer-grained filtering. + // scanner for finer-grained filtering. Use a thread-local MemPool for the filter + // contexts as the embedded expression evaluators may allocate from it and MemPool + // is not thread safe. + MemPool filter_mem_pool(expr_mem_tracker()); vector<FilterContext> filter_ctxs; Status filter_status = Status::OK(); for (auto& filter_ctx: filter_ctxs_) { FilterContext filter; - filter_status = filter.CloneFrom(filter_ctx, runtime_state_); + filter_status = filter.CloneFrom(filter_ctx, pool_, runtime_state_, &filter_mem_pool); if (!filter_status.ok()) break; filter_ctxs.push_back(filter); } @@ -386,14 +389,7 @@ void HdfsScanNode::ScannerThread() { // Unlock before releasing the thread token to avoid deadlock in // ThreadTokenAvailableCb(). l.unlock(); - runtime_state_->resource_pool()->ReleaseThreadToken(false); - if (filter_status.ok()) { - for (auto& ctx: filter_ctxs) { - ctx.expr_ctx->FreeLocalAllocations(); - ctx.expr_ctx->Close(runtime_state_); - } - } - return; + goto exit; } } else { // If this is the only scanner thread, it should keep running regardless @@ -458,16 +454,17 @@ void HdfsScanNode::ScannerThread() { break; } } + COUNTER_ADD(&active_scanner_thread_counter_, -1); +exit: + runtime_state_->resource_pool()->ReleaseThreadToken(false); if (filter_status.ok()) { for (auto& ctx: filter_ctxs) { - ctx.expr_ctx->FreeLocalAllocations(); - ctx.expr_ctx->Close(runtime_state_); + ctx.expr_eval->FreeLocalAllocations(); + ctx.expr_eval->Close(runtime_state_); } } - - COUNTER_ADD(&active_scanner_thread_counter_, -1); - runtime_state_->resource_pool()->ReleaseThreadToken(false); + filter_mem_pool.FreeAll(); } namespace { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scanner-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc index 5b474d1..96aad07 100644 --- a/be/src/exec/hdfs-scanner-ir.cc +++ b/be/src/exec/hdfs-scanner-ir.cc @@ -76,8 +76,8 @@ int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_ return tuples_returned; } -ExprContext* HdfsScanner::GetConjunctCtx(int idx) const { - return (*scanner_conjunct_ctxs_)[idx]; +ScalarExprEvaluator* HdfsScanner::GetConjunctEval(int idx) const { + return (*conjunct_evals_)[idx]; } // Define the string parsing functions for llvm. Stamp out the templated functions http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index a529668..991c40f 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -29,7 +29,7 @@ #include "exec/hdfs-scan-node-mt.h" #include "exec/read-write-util.h" #include "exec/text-converter.inline.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" #include "runtime/collection-value-builder.h" #include "runtime/descriptors.h" #include "runtime/hdfs-fs-cache.h" @@ -64,7 +64,8 @@ HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) stream_(NULL), eos_(false), is_closed_(false), - scanner_conjunct_ctxs_(NULL), + expr_mem_pool_(new MemPool(scan_node->expr_mem_tracker())), + conjunct_evals_(NULL), template_tuple_pool_(new MemPool(scan_node->mem_tracker())), template_tuple_(NULL), tuple_byte_size_(scan_node->tuple_desc()->byte_size()), @@ -85,7 +86,7 @@ HdfsScanner::HdfsScanner() stream_(NULL), eos_(false), is_closed_(false), - scanner_conjunct_ctxs_(NULL), + conjunct_evals_(NULL), template_tuple_pool_(NULL), template_tuple_(NULL), tuple_byte_size_(-1), @@ -107,25 +108,33 @@ Status HdfsScanner::Open(ScannerContext* context) { context_ = context; stream_ = context->GetStream(); - // Clone the scan node's conjuncts map. The cloned contexts must be closed by the + // Clone the scan node's conjuncts map. The cloned evaluators must be closed by the // caller. for (const auto& entry: scan_node_->conjuncts_map()) { - RETURN_IF_ERROR(Expr::CloneIfNotExists(entry.second, - scan_node_->runtime_state(), &scanner_conjuncts_map_[entry.first])); + RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, scan_node_->runtime_state(), + expr_mem_pool_.get(), entry.second, + &conjunct_evals_map_[entry.first])); } - DCHECK(scanner_conjuncts_map_.find(scan_node_->tuple_desc()->id()) != - scanner_conjuncts_map_.end()); - scanner_conjunct_ctxs_ = &scanner_conjuncts_map_[scan_node_->tuple_desc()->id()]; - - // Clone the scan node's dictionary filtering conjuncts map. - for (const auto& entry: scan_node_->dict_filter_conjuncts_map()) { - RETURN_IF_ERROR(Expr::CloneIfNotExists(entry.second, - scan_node_->runtime_state(), &scanner_dict_filter_map_[entry.first])); + DCHECK(conjunct_evals_map_.find(scan_node_->tuple_desc()->id()) != + conjunct_evals_map_.end()); + conjunct_evals_ = &conjunct_evals_map_[scan_node_->tuple_desc()->id()]; + + // Set up the scan node's dictionary filtering conjuncts map. + if (scan_node_->thrift_dict_filter_conjuncts_map() != nullptr) { + for (auto& entry : *(scan_node_->thrift_dict_filter_conjuncts_map())) { + // Convert this slot's list of conjunct indices into a list of pointers + // into conjunct_evals_. + for (int conjunct_idx : entry.second) { + DCHECK_LT(conjunct_idx, conjunct_evals_->size()); + DCHECK((*conjunct_evals_)[conjunct_idx] != nullptr); + dict_filter_map_[entry.first].push_back((*conjunct_evals_)[conjunct_idx]); + } + } } // Initialize the template_tuple_. template_tuple_ = scan_node_->InitTemplateTuple( - context_->partition_descriptor()->partition_key_value_ctxs(), + context_->partition_descriptor()->partition_key_value_evals(), template_tuple_pool_.get(), state_); template_tuple_map_[scan_node_->tuple_desc()] = template_tuple_; @@ -139,8 +148,10 @@ void HdfsScanner::Close(RowBatch* row_batch) { decompressor_->Close(); decompressor_.reset(); } - for (const auto& entry: scanner_conjuncts_map_) Expr::Close(entry.second, state_); - for (const auto& entry: scanner_dict_filter_map_) Expr::Close(entry.second, state_); + for (auto& entry : conjunct_evals_map_) { + ScalarExprEvaluator::Close(entry.second, state_); + } + expr_mem_pool_->FreeAll(); obj_pool_.Clear(); stream_ = NULL; context_->ClearStreams(); @@ -223,8 +234,8 @@ Status HdfsScanner::CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row RETURN_IF_ERROR(state_->GetQueryStatus()); // Free local expr allocations for this thread to avoid accumulating too much // memory from evaluating the scanner conjuncts. - for (const auto& entry: scanner_conjuncts_map_) { - ExprContext::FreeLocalAllocations(entry.second); + for (const auto& entry: conjunct_evals_map_) { + ScalarExprEvaluator::FreeLocalAllocations(entry.second); } return Status::OK(); } @@ -232,7 +243,7 @@ Status HdfsScanner::CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row int HdfsScanner::WriteTemplateTuples(TupleRow* row, int num_tuples) { DCHECK_GE(num_tuples, 0); DCHECK_EQ(scan_node_->tuple_idx(), 0); - DCHECK_EQ(scanner_conjunct_ctxs_->size(), 0); + DCHECK_EQ(conjunct_evals_->size(), 0); if (num_tuples == 0 || template_tuple_ == NULL) return num_tuples; Tuple** row_tuple = reinterpret_cast<Tuple**>(row); @@ -306,10 +317,10 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields, // %error_in_row2 = or i1 false, %slot_parse_error // %3 = zext i1 %slot_parse_error to i8 // store i8 %3, i8* %slot_error_ptr -// %4 = call %"class.impala::ExprContext"* @GetConjunctCtx( +// %4 = call %"class.impala::ScalarExprEvaluator"* @GetConjunctCtx( // %"class.impala::HdfsScanner"* %this, i32 0) // %conjunct_eval = call i16 @"impala::Operators::Eq_StringVal_StringValWrapper"( -// %"class.impala::ExprContext"* %4, %"class.impala::TupleRow"* %tuple_row) +// %"class.impala::ScalarExprEvaluator"* %4, %"class.impala::TupleRow"* %tuple_row) // %5 = ashr i16 %conjunct_eval, 8 // %6 = trunc i16 %5 to i8 // %val = trunc i8 %6 to i1 @@ -324,7 +335,7 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields, // ret i1 false // } Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node, - LlvmCodeGen* codegen, const vector<ExprContext*>& conjunct_ctxs, + LlvmCodeGen* codegen, const vector<ScalarExpr*>& conjuncts, Function** write_complete_tuple_fn) { *write_complete_tuple_fn = NULL; SCOPED_TIMER(codegen->codegen_timer()); @@ -436,7 +447,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node, builder.CreateBr(parse_block); // Loop through all the conjuncts in order and materialize slots as necessary to - // evaluate the conjuncts (e.g. conjunct_ctxs[0] will have the slots it references + // evaluate the conjuncts (e.g. conjuncts[0] will have the slots it references // first). // materialized_order[slot_idx] represents the first conjunct which needs that slot. // Slots are only materialized if its order matches the current conjunct being @@ -444,7 +455,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node, // needed and that at the end of the materialize loop, the conjunct has everything // it needs (either from this iteration or previous iterations). builder.SetInsertPoint(parse_block); - for (int conjunct_idx = 0; conjunct_idx <= conjunct_ctxs.size(); ++conjunct_idx) { + for (int conjunct_idx = 0; conjunct_idx <= conjuncts.size(); ++conjunct_idx) { for (int slot_idx = 0; slot_idx < materialize_order.size(); ++slot_idx) { // If they don't match, it means either the slot has already been // materialized for a previous conjunct or will be materialized later for @@ -452,7 +463,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node, // yet. if (materialize_order[slot_idx] != conjunct_idx) continue; - // Materialize slots[slot_idx] to evaluate conjunct_ctxs[conjunct_idx] + // Materialize slots[slot_idx] to evaluate conjuncts[conjunct_idx] // All slots[i] with materialized_order[i] < conjunct_idx have already been // materialized by prior iterations through the outer loop @@ -500,7 +511,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node, builder.CreateStore(slot_error, error_ptr); } - if (conjunct_idx == conjunct_ctxs.size()) { + if (conjunct_idx == conjuncts.size()) { // In this branch, we've just materialized slots not referenced by any conjunct. // This slots are the last to get materialized. If we are in this branch, the // tuple passed all conjuncts and should be added to the row batch. @@ -508,12 +519,12 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node, builder.CreateStore(error_ret, error_in_row_arg); builder.CreateRet(codegen->true_value()); } else { - // All slots for conjunct_ctxs[conjunct_idx] are materialized, evaluate the partial + // All slots for conjuncts[conjunct_idx] are materialized, evaluate the partial // tuple against that conjunct and start a new parse_block for the next conjunct parse_block = BasicBlock::Create(context, "parse", fn, eval_fail_block); Function* conjunct_fn; Status status = - conjunct_ctxs[conjunct_idx]->root()->GetCodegendComputeFn(codegen, &conjunct_fn); + conjuncts[conjunct_idx]->GetCodegendComputeFn(codegen, &conjunct_fn); if (!status.ok()) { stringstream ss; ss << "Failed to codegen conjunct: " << status.GetDetail(); @@ -526,12 +537,12 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node, codegen->SetNoInline(conjunct_fn); } - Function* get_ctx_fn = - codegen->GetFunction(IRFunction::HDFS_SCANNER_GET_CONJUNCT_CTX, false); - Value* ctx = builder.CreateCall(get_ctx_fn, + Function* get_eval_fn = + codegen->GetFunction(IRFunction::HDFS_SCANNER_GET_CONJUNCT_EVALUATOR, false); + Value* eval = builder.CreateCall(get_eval_fn, ArrayRef<Value*>({this_arg, codegen->GetIntConstant(TYPE_INT, conjunct_idx)})); - Value* conjunct_args[] = {ctx, tuple_row_arg}; + Value* conjunct_args[] = {eval, tuple_row_arg}; CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped( codegen, &builder, TYPE_BOOLEAN, conjunct_fn, conjunct_args, "conjunct_eval"); builder.CreateCondBr(result.GetVal(), parse_block, eval_fail_block); @@ -543,7 +554,7 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node, builder.SetInsertPoint(eval_fail_block); builder.CreateRet(codegen->false_value()); - if (node->materialized_slots().size() + conjunct_ctxs.size() + if (node->materialized_slots().size() + conjuncts.size() > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) { codegen->SetNoInline(fn); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 035b41c..03ca624 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -206,16 +206,22 @@ class HdfsScanner { /// Starts as false and is set to true in Close(). bool is_closed_; - /// Clones of the conjuncts ExprContexts in scan_node_->conjuncts_map(). Each scanner - /// has its own ExprContexts so the conjuncts can be safely evaluated in parallel. - HdfsScanNodeBase::ConjunctsMap scanner_conjuncts_map_; + /// MemPool used for expression evaluators in this scanner. Need to be local + /// to each scanner as MemPool is not thread safe. + boost::scoped_ptr<MemPool> expr_mem_pool_; - // Convenience reference to scanner_conjuncts_map_[scan_node_->tuple_idx()] for scanners - // that do not support nested types. - const std::vector<ExprContext*>* scanner_conjunct_ctxs_; + /// Clones of the conjuncts' evaluators in scan_node_->conjuncts_map(). + /// Each scanner has its own ScalarExprEvaluators so the conjuncts can be safely + /// evaluated in parallel. + HdfsScanNodeBase::ConjunctEvaluatorsMap conjunct_evals_map_; - // Clones of the conjuncts ExprContexts in scan_node_->dict_filter_conjuncts_map(). - HdfsScanNodeBase::DictFilterConjunctsMap scanner_dict_filter_map_; + // Convenience reference to conjuncts_evals_map_[scan_node_->tuple_idx()] for + // scanners that do not support nested types. + const std::vector<ScalarExprEvaluator*>* conjunct_evals_; + + // Clones of the conjuncts' evaluators in scan_node_->dict_filter_conjuncts_map(). + typedef std::map<SlotId, std::vector<ScalarExprEvaluator*>> DictFilterConjunctsMap; + DictFilterConjunctsMap dict_filter_map_; /// Holds memory for template tuples. The memory in this pool must remain valid as long /// as the row batches produced by this scanner. This typically means that the @@ -359,12 +365,12 @@ class HdfsScanner { return Status::OK(); } - /// Convenience function for evaluating conjuncts using this scanner's ExprContexts. + /// Convenience function for evaluating conjuncts using this scanner's ScalarExprEvaluators. /// This must always be inlined so we can correctly replace the call to /// ExecNode::EvalConjuncts() during codegen. bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow* row) { - return ExecNode::EvalConjuncts(&(*scanner_conjunct_ctxs_)[0], - scanner_conjunct_ctxs_->size(), row); + return ExecNode::EvalConjuncts(&(*conjunct_evals_)[0], + conjunct_evals_->size(), row); } /// Sets 'num_tuples' template tuples in the batch that 'row' points to. Assumes the @@ -436,7 +442,7 @@ class HdfsScanner { /// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn' /// if codegen was successful or NULL otherwise. static Status CodegenWriteCompleteTuple(HdfsScanNodeBase* node, LlvmCodeGen* codegen, - const std::vector<ExprContext*>& conjunct_ctxs, + const std::vector<ScalarExpr*>& conjuncts, llvm::Function** write_complete_tuple_fn); /// Codegen function to replace WriteAlignedTuples. WriteAlignedTuples is cross @@ -512,10 +518,10 @@ class HdfsScanner { return reinterpret_cast<TupleRow*>(mem + sizeof(Tuple*)); } - /// Simple wrapper around scanner_conjunct_ctxs_. Used in the codegen'd version of + /// Simple wrapper around conjunct_evals_[idx]. Used in the codegen'd version of /// WriteCompleteTuple() because it's easier than writing IR to access - /// scanner_conjunct_ctxs_. - ExprContext* GetConjunctCtx(int idx) const; + /// conjunct_evals_. + ScalarExprEvaluator* GetConjunctEval(int idx) const; /// Unit test constructor HdfsScanner(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index 275f96b..355a554 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -53,13 +53,13 @@ HdfsSequenceScanner::~HdfsSequenceScanner() { // Codegen for materialized parsed data into tuples. Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node, - const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) { + const vector<ScalarExpr*>& conjuncts, Function** write_aligned_tuples_fn) { *write_aligned_tuples_fn = NULL; DCHECK(node->runtime_state()->ShouldCodegen()); LlvmCodeGen* codegen = node->runtime_state()->codegen(); DCHECK(codegen != NULL); Function* write_complete_tuple_fn; - RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs, + RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjuncts, &write_complete_tuple_fn)); DCHECK(write_complete_tuple_fn != NULL); RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn, @@ -268,11 +268,11 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() { delimited_text_parser_->escape_char() == '\0'); // last argument: seq always starts at record_location[0] tuples_returned = write_tuples_fn_(this, pool, tuple_row, - batch_->row_byte_size(), &field_locations_[0], num_to_process, + batch_->row_byte_size(), field_locations_.data(), num_to_process, max_added_tuples, scan_node_->materialized_slots().size(), 0); } else { tuples_returned = WriteAlignedTuples(pool, tuple_row, - batch_->row_byte_size(), &field_locations_[0], num_to_process, + batch_->row_byte_size(), field_locations_.data(), num_to_process, max_added_tuples, scan_node_->materialized_slots().size(), 0); } @@ -321,17 +321,17 @@ Status HdfsSequenceScanner::ProcessRange() { RETURN_IF_ERROR(delimited_text_parser_->ParseFieldLocations( 1, record_locations_[0].len, reinterpret_cast<char**>(&record_start), - &row_end_loc, &field_locations_[0], &num_tuples, &num_fields, &col_start)); + &row_end_loc, field_locations_.data(), &num_tuples, &num_fields, &col_start)); DCHECK(num_tuples == 1); uint8_t errors[num_fields]; memset(errors, 0, num_fields); - add_row = WriteCompleteTuple(pool, &field_locations_[0], tuple_, tuple_row_mem, + add_row = WriteCompleteTuple(pool, field_locations_.data(), tuple_, tuple_row_mem, template_tuple_, &errors[0], &error_in_row); if (UNLIKELY(error_in_row)) { - ReportTupleParseError(&field_locations_[0], errors); + ReportTupleParseError(field_locations_.data(), errors); RETURN_IF_ERROR(parse_status_); } } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-sequence-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h index 5d6074f..01d5a68 100644 --- a/be/src/exec/hdfs-sequence-scanner.h +++ b/be/src/exec/hdfs-sequence-scanner.h @@ -172,7 +172,7 @@ class HdfsSequenceScanner : public BaseSequenceScanner { /// Codegen WriteAlignedTuples(). Stores the resulting function in /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise. static Status Codegen(HdfsScanNodeBase* node, - const std::vector<ExprContext*>& conjunct_ctxs, + const std::vector<ScalarExpr*>& conjuncts, llvm::Function** write_aligned_tuples_fn); protected: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-sequence-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc index a05aa5d..37463f3 100644 --- a/be/src/exec/hdfs-sequence-table-writer.cc +++ b/be/src/exec/hdfs-sequence-table-writer.cc @@ -20,8 +20,8 @@ #include "exec/exec-node.h" #include "util/hdfs-util.h" #include "util/uid-util.h" -#include "exprs/expr.h" -#include "exprs/expr-context.h" +#include "exprs/scalar-expr.h" +#include "exprs/scalar-expr-evaluator.h" #include "runtime/mem-tracker.h" #include "runtime/raw-value.h" #include "runtime/row-batch.h" @@ -44,13 +44,11 @@ const char* HdfsSequenceTableWriter::KEY_CLASS_NAME = "org.apache.hadoop.io.BytesWritable"; HdfsSequenceTableWriter::HdfsSequenceTableWriter(HdfsTableSink* parent, - RuntimeState* state, OutputPartition* output, - const HdfsPartitionDescriptor* partition, - const HdfsTableDescriptor* table_desc, - const vector<ExprContext*>& output_exprs) - : HdfsTableWriter(parent, state, output, partition, table_desc, output_exprs), - mem_pool_(new MemPool(parent->mem_tracker())), compress_flag_(false), - unflushed_rows_(0), record_compression_(false) { + RuntimeState* state, OutputPartition* output, + const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* table_desc) + : HdfsTableWriter(parent, state, output, partition, table_desc), + mem_pool_(new MemPool(parent->mem_tracker())), compress_flag_(false), + unflushed_rows_(0), record_compression_(false) { approx_block_size_ = 64 * 1024 * 1024; parent->mem_tracker()->Consume(approx_block_size_); field_delim_ = partition->field_delim(); @@ -106,7 +104,8 @@ Status HdfsSequenceTableWriter::AppendRows( bool all_rows = row_group_indices.empty(); int num_non_partition_cols = table_desc_->num_cols() - table_desc_->num_clustering_cols(); - DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << parent_->DebugString(); + DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols) + << parent_->DebugString(); { SCOPED_TIMER(parent_->encode_timer()); @@ -241,15 +240,16 @@ void HdfsSequenceTableWriter::EncodeRow(TupleRow* row, WriteStream* buf) { // TODO Unify with text table writer int num_non_partition_cols = table_desc_->num_cols() - table_desc_->num_clustering_cols(); - DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << parent_->DebugString(); + DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols) + << parent_->DebugString(); for (int j = 0; j < num_non_partition_cols; ++j) { - void* value = output_expr_ctxs_[j]->GetValue(row); + void* value = output_expr_evals_[j]->GetValue(row); if (value != NULL) { - if (output_expr_ctxs_[j]->root()->type().type == TYPE_STRING) { + if (output_expr_evals_[j]->root().type().type == TYPE_STRING) { WriteEscapedString(reinterpret_cast<const StringValue*>(value), &row_buf_); } else { string str; - output_expr_ctxs_[j]->PrintValue(value, &str); + output_expr_evals_[j]->PrintValue(value, &str); buf->WriteBytes(str.size(), str.data()); } } else { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-sequence-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-table-writer.h b/be/src/exec/hdfs-sequence-table-writer.h index 3d72926..f315920 100644 --- a/be/src/exec/hdfs-sequence-table-writer.h +++ b/be/src/exec/hdfs-sequence-table-writer.h @@ -100,11 +100,9 @@ struct OutputPartition; /// Output is buffered to fill sequence file blocks. class HdfsSequenceTableWriter : public HdfsTableWriter { public: - HdfsSequenceTableWriter(HdfsTableSink* parent, - RuntimeState* state, OutputPartition* output, - const HdfsPartitionDescriptor* partition, - const HdfsTableDescriptor* table_desc, - const std::vector<ExprContext*>& output_exprs); + HdfsSequenceTableWriter(HdfsTableSink* parent, RuntimeState* state, + OutputPartition* output, const HdfsPartitionDescriptor* partition, + const HdfsTableDescriptor* table_desc); ~HdfsSequenceTableWriter() { }
