Repository: incubator-impala Updated Branches: refs/heads/master c23bf38a2 -> 9e438a08f
IMPALA-3629: Codegen TransferScratchTuples() in hdfs-parquet-scanner We currently don't do any codegen in the hdfs-parquet scanner, which limits performance. This patch creates a new function, ProcessScratchBatch, which contains the inner loop of TransferScratchTuples, allowing us to cross-compile only this performance critical part. It also uses CodegenEvalConjuncts to replace the call to EvalConjuncts in ProcessScratchBatch with a codegen'd version. Additionally, it modifies the Codegen functions in hdfs-avro/text/sequence-scanner to take an output parameter for the codegen'd function and return a Status in order to improve logging around codegen failures. Change-Id: Ic327e437c7cd2b3f92cdb11c1e907bfee2d44ee8 Reviewed-on: http://gerrit.cloudera.org:8080/4093 Reviewed-by: Thomas Tauber-Marshall <[email protected]> Reviewed-by: Matthew Jacobs <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9e438a08 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9e438a08 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9e438a08 Branch: refs/heads/master Commit: 9e438a08f1fa5761a88f0c9329bfedaeae3206d1 Parents: c23bf38 Author: Thomas Tauber-Marshall <[email protected]> Authored: Tue Jul 26 11:28:00 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Tue Aug 23 21:57:55 2016 +0000 ---------------------------------------------------------------------- be/src/codegen/gen_ir_descriptions.py | 1 + be/src/codegen/impala-ir.cc | 1 + be/src/exec/CMakeLists.txt | 1 + be/src/exec/hdfs-avro-scanner.cc | 67 +++++++++++--------- be/src/exec/hdfs-avro-scanner.h | 24 ++++--- be/src/exec/hdfs-parquet-scanner-ir.cc | 65 +++++++++++++++++++ be/src/exec/hdfs-parquet-scanner.cc | 98 ++++++++++++++++------------- be/src/exec/hdfs-parquet-scanner.h | 14 +++++ be/src/exec/hdfs-scan-node.cc | 19 ++++-- be/src/exec/hdfs-scanner.cc | 37 +++++++---- be/src/exec/hdfs-scanner.h | 21 ++++--- be/src/exec/hdfs-sequence-scanner.cc | 23 ++++--- be/src/exec/hdfs-sequence-scanner.h | 7 ++- be/src/exec/hdfs-text-scanner.cc | 23 ++++--- be/src/exec/hdfs-text-scanner.h | 7 ++- 15 files changed, 278 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/codegen/gen_ir_descriptions.py ---------------------------------------------------------------------- diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py index 6c6761f..9ba9d78 100755 --- a/be/src/codegen/gen_ir_descriptions.py +++ b/be/src/codegen/gen_ir_descriptions.py @@ -113,6 +113,7 @@ ir_functions = [ ["READ_AVRO_DECIMAL", "ReadAvroDecimal"], ["HDFS_SCANNER_WRITE_ALIGNED_TUPLES", "WriteAlignedTuples"], ["HDFS_SCANNER_GET_CONJUNCT_CTX", "GetConjunctCtx"], + ["PROCESS_SCRATCH_BATCH", "ProcessScratchBatch"], ["STRING_TO_BOOL", "IrStringToBool"], ["STRING_TO_INT8", "IrStringToInt8"], ["STRING_TO_INT16", "IrStringToInt16"], http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/codegen/impala-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc index 2221051..185d6c2 100644 --- a/be/src/codegen/impala-ir.cc +++ b/be/src/codegen/impala-ir.cc @@ -26,6 +26,7 @@ #include "exec/hash-join-node-ir.cc" #include "exec/hash-table-ir.cc" #include "exec/hdfs-avro-scanner-ir.cc" +#include "exec/hdfs-parquet-scanner-ir.cc" #include "exec/hdfs-scanner-ir.cc" #include "exec/partitioned-aggregation-node-ir.cc" #include "exec/partitioned-hash-join-node-ir.cc" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index afb1a59..091b9b7 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -60,6 +60,7 @@ add_library(Exec hdfs-text-table-writer.cc hdfs-sequence-table-writer.cc hdfs-parquet-scanner.cc + hdfs-parquet-scanner-ir.cc hdfs-parquet-table-writer.cc hbase-scan-node.cc hbase-table-scanner.cc http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-avro-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc index 1c31718..ff278de 100644 --- a/be/src/exec/hdfs-avro-scanner.cc +++ b/be/src/exec/hdfs-avro-scanner.cc @@ -78,14 +78,21 @@ Status HdfsAvroScanner::Open(ScannerContext* context) { return Status::OK(); } -Function* HdfsAvroScanner::Codegen(HdfsScanNode* node, - const vector<ExprContext*>& conjunct_ctxs) { - if (!node->runtime_state()->codegen_enabled()) return NULL; +Status HdfsAvroScanner::Codegen(HdfsScanNode* node, + const vector<ExprContext*>& conjunct_ctxs, Function** decode_avro_data_fn) { + *decode_avro_data_fn = NULL; + if (!node->runtime_state()->codegen_enabled()) { + return Status("Disabled by query option."); + } LlvmCodeGen* codegen; - if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL; - Function* materialize_tuple_fn = CodegenMaterializeTuple(node, codegen); - if (materialize_tuple_fn == NULL) return NULL; - return CodegenDecodeAvroData(node->runtime_state(), materialize_tuple_fn, conjunct_ctxs); + RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen)); + Function* materialize_tuple_fn; + RETURN_IF_ERROR(CodegenMaterializeTuple(node, codegen, &materialize_tuple_fn)); + DCHECK(materialize_tuple_fn != NULL); + RETURN_IF_ERROR(CodegenDecodeAvroData(node->runtime_state(), materialize_tuple_fn, + conjunct_ctxs, decode_avro_data_fn)); + DCHECK(*decode_avro_data_fn != NULL); + return Status::OK(); } BaseSequenceScanner::FileHeader* HdfsAvroScanner::AllocateFileHeader() { @@ -765,8 +772,8 @@ void HdfsAvroScanner::SetStatusValueOverflow(TErrorCode::type error_code, int64_ // bail_out: ; preds = %read_field11, %end_field3, %read_field2, %end_field, // ret i1 false ; %read_field, %entry // } -Function* HdfsAvroScanner::CodegenMaterializeTuple( - HdfsScanNode* node, LlvmCodeGen* codegen) { +Status HdfsAvroScanner::CodegenMaterializeTuple(HdfsScanNode* node, LlvmCodeGen* codegen, + Function** materialize_tuple_fn) { LLVMContext& context = codegen->context(); LlvmCodeGen::LlvmBuilder builder(context); @@ -776,7 +783,7 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple( TupleDescriptor* tuple_desc = const_cast<TupleDescriptor*>(node->tuple_desc()); StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen); - if (tuple_type == NULL) return NULL; // Could not generate tuple struct + if (tuple_type == NULL) return Status("Could not generate tuple struct."); Type* tuple_ptr_type = PointerType::get(tuple_type, 0); Type* tuple_opaque_type = codegen->GetType(Tuple::LLVM_CLASS_NAME); @@ -814,7 +821,7 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple( if (!status.ok()) { VLOG_QUERY << status.GetDetail(); fn->eraseFromParent(); - return NULL; + return status; } // Returns true on successful decoding. @@ -824,7 +831,11 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple( builder.SetInsertPoint(bail_out_block); builder.CreateRet(codegen->false_value()); - return codegen->FinalizeFunction(fn); + *materialize_tuple_fn = codegen->FinalizeFunction(fn); + if (*materialize_tuple_fn == NULL) { + return Status("Failed to finalize materialize_tuple_fn."); + } + return Status::OK(); } Status HdfsAvroScanner::CodegenReadRecord( @@ -1005,32 +1016,30 @@ Status HdfsAvroScanner::CodegenReadScalar(const AvroSchemaElement& element, return Status::OK(); } -// TODO: return Status -Function* HdfsAvroScanner::CodegenDecodeAvroData(RuntimeState* state, - Function* materialize_tuple_fn, const vector<ExprContext*>& conjunct_ctxs) { +Status HdfsAvroScanner::CodegenDecodeAvroData(RuntimeState* state, + Function* materialize_tuple_fn, const vector<ExprContext*>& conjunct_ctxs, + Function** decode_avro_data_fn) { LlvmCodeGen* codegen; - if (!state->GetCodegen(&codegen).ok()) return NULL; + RETURN_IF_ERROR(state->GetCodegen(&codegen)); SCOPED_TIMER(codegen->codegen_timer()); DCHECK(materialize_tuple_fn != NULL); - Function* decode_avro_data_fn = - codegen->GetFunction(IRFunction::DECODE_AVRO_DATA, true); + Function* fn = codegen->GetFunction(IRFunction::DECODE_AVRO_DATA, true); - int replaced = codegen->ReplaceCallSites(decode_avro_data_fn, materialize_tuple_fn, - "MaterializeTuple"); + int replaced = codegen->ReplaceCallSites(fn, materialize_tuple_fn, "MaterializeTuple"); DCHECK_EQ(replaced, 1); Function* eval_conjuncts_fn; - Status status = - ExecNode::CodegenEvalConjuncts(state, conjunct_ctxs, &eval_conjuncts_fn); - if (!status.ok()) return NULL; + RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(state, conjunct_ctxs, + &eval_conjuncts_fn)); - replaced = codegen->ReplaceCallSites(decode_avro_data_fn, eval_conjuncts_fn, - "EvalConjuncts"); + replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts"); DCHECK_EQ(replaced, 1); - decode_avro_data_fn->setName("DecodeAvroData"); - decode_avro_data_fn = codegen->FinalizeFunction(decode_avro_data_fn); - DCHECK(decode_avro_data_fn != NULL); - return decode_avro_data_fn; + fn->setName("DecodeAvroData"); + *decode_avro_data_fn = codegen->FinalizeFunction(fn); + if (*decode_avro_data_fn == NULL) { + return Status("Failed to finalize decode_avro_data_fn."); + } + return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-avro-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h index fbb519f..f95cd10 100644 --- a/be/src/exec/hdfs-avro-scanner.h +++ b/be/src/exec/hdfs-avro-scanner.h @@ -93,9 +93,10 @@ class HdfsAvroScanner : public BaseSequenceScanner { virtual Status Open(ScannerContext* context); - /// Codegen parsing records, writing tuples and evaluating predicates. - static llvm::Function* Codegen(HdfsScanNode*, - const std::vector<ExprContext*>& conjunct_ctxs); + /// Codegen DecodeAvroData(). Stores the resulting function in 'decode_avro_data_fn' if + /// codegen was successful or NULL otherwise. + static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs, + llvm::Function** decode_avro_data_fn); protected: /// Implementation of BaseSeqeunceScanner super class methods @@ -193,16 +194,19 @@ class HdfsAvroScanner : public BaseSequenceScanner { uint8_t** data, uint8_t* data_end, Tuple* tuple); /// Produces a version of DecodeAvroData that uses codegen'd instead of interpreted - /// functions. - static llvm::Function* CodegenDecodeAvroData( - RuntimeState* state, llvm::Function* materialize_tuple_fn, - const std::vector<ExprContext*>& conjunct_ctxs); + /// functions. Stores the resulting function in 'decode_avro_data_fn' if codegen was + /// successful or returns an error. + static Status CodegenDecodeAvroData(RuntimeState* state, + llvm::Function* materialize_tuple_fn, + const std::vector<ExprContext*>& conjunct_ctxs, + llvm::Function** decode_avro_data_fn); /// Codegens a version of MaterializeTuple() that reads records based on the table - /// schema. + /// schema. Stores the resulting function in 'materialize_tuple_fn' if codegen was + /// successful or returns an error. /// TODO: Codegen a function for each unique file schema. - static llvm::Function* CodegenMaterializeTuple(HdfsScanNode* node, - LlvmCodeGen* codegen); + static Status CodegenMaterializeTuple(HdfsScanNode* node, LlvmCodeGen* codegen, + llvm::Function** materialize_tuple_fn); /// Used by CodegenMaterializeTuple to recursively create the IR for reading an Avro /// record. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-parquet-scanner-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner-ir.cc b/be/src/exec/hdfs-parquet-scanner-ir.cc new file mode 100644 index 0000000..1cd5252 --- /dev/null +++ b/be/src/exec/hdfs-parquet-scanner-ir.cc @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/hdfs-parquet-scanner.h" + +#include "exec/parquet-scratch-tuple-batch.h" +#include "runtime/tuple-row.h" + +using namespace impala; + +int HdfsParquetScanner::ProcessScratchBatch(RowBatch* dst_batch) { + const bool has_filters = !filter_ctxs_.empty(); + ExprContext* const* conjunct_ctxs = &(*scanner_conjunct_ctxs_)[0]; + const int num_conjuncts = scanner_conjunct_ctxs_->size(); + + // Start/end/current iterators over the output rows. + Tuple** output_row_start = + reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows())); + Tuple** output_row_end = + output_row_start + (dst_batch->capacity() - dst_batch->num_rows()); + Tuple** output_row = output_row_start; + + // Start/end/current iterators over the scratch tuples. + uint8_t* scratch_tuple_start = scratch_batch_->CurrTuple(); + uint8_t* scratch_tuple_end = scratch_batch_->TupleEnd(); + uint8_t* scratch_tuple = scratch_tuple_start; + const int tuple_size = scratch_batch_->tuple_byte_size; + + // Loop until the scratch batch is exhausted or the output batch is full. + // Do not use batch_->AtCapacity() in this loop because it is not necessary + // to perform the memory capacity check. + while (scratch_tuple != scratch_tuple_end) { + *output_row = reinterpret_cast<Tuple*>(scratch_tuple); + scratch_tuple += tuple_size; + // Evaluate runtime filters and conjuncts. Short-circuit the evaluation if + // the filters/conjuncts are empty to avoid function calls. + if (has_filters && !EvalRuntimeFilters(reinterpret_cast<TupleRow*>(output_row))) { + continue; + } + if (!ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts, + reinterpret_cast<TupleRow*>(output_row))) { + continue; + } + // Row survived runtime filters and conjuncts. + ++output_row; + if (output_row == output_row_end) break; + } + scratch_batch_->tuple_idx += (scratch_tuple - scratch_tuple_start) / tuple_size; + + return output_row - output_row_start; +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 2aaf8de..30c07a4 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -23,6 +23,7 @@ #include <gflags/gflags.h> #include <gutil/strings/substitute.h> +#include "codegen/llvm-codegen.h" #include "common/logging.h" #include "exec/hdfs-scanner.h" #include "exec/hdfs-scan-node.h" @@ -45,6 +46,7 @@ #include "common/names.h" +using llvm::Function; using strings::Substitute; using namespace impala; @@ -151,7 +153,8 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* scan_node, RuntimeState* st dictionary_pool_(new MemPool(scan_node->mem_tracker())), assemble_rows_timer_(scan_node_->materialize_tuple_timer()), num_cols_counter_(NULL), - num_row_groups_counter_(NULL) { + num_row_groups_counter_(NULL), + codegend_process_scratch_batch_fn_(NULL) { assemble_rows_timer_.Stop(); } @@ -164,7 +167,13 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { num_row_groups_counter_ = ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT); - scan_node_->IncNumScannersCodegenDisabled(); + codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>( + scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET)); + if (codegend_process_scratch_batch_fn_ == NULL) { + scan_node_->IncNumScannersCodegenDisabled(); + } else { + scan_node_->IncNumScannersCodegenEnabled(); + } level_cache_pool_.reset(new MemPool(scan_node_->mem_tracker())); @@ -562,33 +571,17 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) { // we always call CommitRows() after TransferScratchTuples(), the output batch can // never be empty. DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity()); - - const bool has_filters = !filter_ctxs_.empty(); - const bool has_conjuncts = !scanner_conjunct_ctxs_->empty(); - ExprContext* const* conjunct_ctxs = &(*scanner_conjunct_ctxs_)[0]; - const int num_conjuncts = scanner_conjunct_ctxs_->size(); - - // Start/end/current iterators over the output rows. DCHECK_EQ(scan_node_->tuple_idx(), 0); DCHECK_EQ(dst_batch->row_desc().tuple_descriptors().size(), 1); - Tuple** output_row_start = - reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows())); - Tuple** output_row_end = - output_row_start + (dst_batch->capacity() - dst_batch->num_rows()); - Tuple** output_row = output_row_start; - - // Start/end/current iterators over the scratch tuples. - uint8_t* scratch_tuple_start = scratch_batch_->CurrTuple(); - uint8_t* scratch_tuple_end = scratch_batch_->TupleEnd(); - uint8_t* scratch_tuple = scratch_tuple_start; - const int tuple_size = scratch_batch_->tuple_byte_size; - - if (tuple_size == 0) { + + if (scratch_batch_->tuple_byte_size == 0) { + Tuple** output_row = + reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows())); // We are materializing a collection with empty tuples. Add a NULL tuple to the // output batch per remaining scratch tuple and return. No need to evaluate // filters/conjuncts. - DCHECK(!has_filters); - DCHECK(!has_conjuncts); + DCHECK(filter_ctxs_.empty()); + DCHECK(scanner_conjunct_ctxs_->empty()); int num_tuples = min(dst_batch->capacity() - dst_batch->num_rows(), scratch_batch_->num_tuples - scratch_batch_->tuple_idx); memset(output_row, 0, num_tuples * sizeof(Tuple*)); @@ -599,26 +592,12 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) { return num_tuples; } - // Loop until the scratch batch is exhausted or the output batch is full. - // Do not use batch_->AtCapacity() in this loop because it is not necessary - // to perform the memory capacity check. - while (scratch_tuple != scratch_tuple_end) { - *output_row = reinterpret_cast<Tuple*>(scratch_tuple); - scratch_tuple += tuple_size; - // Evaluate runtime filters and conjuncts. Short-circuit the evaluation if - // the filters/conjuncts are empty to avoid function calls. - if (has_filters && !EvalRuntimeFilters(reinterpret_cast<TupleRow*>(output_row))) { - continue; - } - if (has_conjuncts && !ExecNode::EvalConjuncts( - conjunct_ctxs, num_conjuncts, reinterpret_cast<TupleRow*>(output_row))) { - continue; - } - // Row survived runtime filters and conjuncts. - ++output_row; - if (output_row == output_row_end) break; + int num_row_to_commit; + if (codegend_process_scratch_batch_fn_ != NULL) { + num_row_to_commit = codegend_process_scratch_batch_fn_(this, dst_batch); + } else { + num_row_to_commit = ProcessScratchBatch(dst_batch); } - scratch_batch_->tuple_idx += (scratch_tuple - scratch_tuple_start) / tuple_size; // TODO: Consider compacting the output row batch to better handle cases where // there are few surviving tuples per scratch batch. In such cases, we could @@ -627,7 +606,38 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* dst_batch) { if (scratch_batch_->AtEnd()) { dst_batch->tuple_data_pool()->AcquireData(scratch_batch_->mem_pool(), false); } - return output_row - output_row_start; + return num_row_to_commit; +} + +Status HdfsParquetScanner::Codegen(HdfsScanNode* node, + const vector<ExprContext*>& conjunct_ctxs, Function** process_scratch_batch_fn) { + *process_scratch_batch_fn = NULL; + if (!node->runtime_state()->codegen_enabled()) { + return Status("Disabled by query option."); + } + + LlvmCodeGen* codegen; + RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen)); + DCHECK(codegen != NULL); + SCOPED_TIMER(codegen->codegen_timer()); + + Function* fn = codegen->GetFunction(IRFunction::PROCESS_SCRATCH_BATCH, true); + DCHECK(fn != NULL); + + Function* eval_conjuncts_fn; + RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(node->runtime_state(), conjunct_ctxs, + &eval_conjuncts_fn)); + DCHECK(eval_conjuncts_fn != NULL); + + int replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts"); + DCHECK_EQ(replaced, 1); + + fn->setName("ProcessScratchBatch"); + *process_scratch_batch_fn = codegen->FinalizeFunction(fn); + if (*process_scratch_batch_fn == NULL) { + return Status("Failed to finalize process_scratch_batch_fn."); + } + return Status::OK(); } bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index dfd7785..425444f 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -333,6 +333,11 @@ class HdfsParquetScanner : public HdfsScanner { virtual Status ProcessSplit(); virtual void Close(RowBatch* row_batch); + /// Codegen ProcessScratchBatch(). Stores the resulting function in + /// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise. + static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs, + llvm::Function** process_scratch_batch_fn); + /// The repetition level is set to this value to indicate the end of a row group. static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min(); /// Indicates an invalid definition or repetition level. @@ -427,6 +432,10 @@ class HdfsParquetScanner : public HdfsScanner { /// Number of row groups that need to be read. RuntimeProfile::Counter* num_row_groups_counter_; + typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*); + /// The codegen'd version of ProcessScratchBatch() if available, NULL otherwise. + ProcessScratchBatchFn codegend_process_scratch_batch_fn_; + const char* filename() const { return metadata_range_->file(); } virtual Status GetNextInternal(RowBatch* row_batch, bool* eos); @@ -464,6 +473,11 @@ class HdfsParquetScanner : public HdfsScanner { /// Returns the number of rows that should be committed to the given batch. int TransferScratchTuples(RowBatch* dst_batch); + /// Processes a single row batch for TransferScratchTuples, looping over scratch_batch_ + /// until it is exhausted or the output is full. Called for the case when there are + /// materialized tuples. This is a separate function so it can be codegened. + int ProcessScratchBatch(RowBatch* dst_batch); + /// Evaluates runtime filters (if any) against the given row. Returns true if /// they passed, false otherwise. Maintains the runtime filter stats, determines /// whether the filters are effective, and disables them if they are not. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 6077f99..9ed78de 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -672,21 +672,32 @@ Status HdfsScanNode::Prepare(RuntimeState* state) { // Create reusable codegen'd functions for each file type type needed // TODO: do this for conjuncts_map_ Function* fn; + Status status; switch (format) { case THdfsFileFormat::TEXT: - fn = HdfsTextScanner::Codegen(this, conjunct_ctxs_); + status = HdfsTextScanner::Codegen(this, conjunct_ctxs_, &fn); break; case THdfsFileFormat::SEQUENCE_FILE: - fn = HdfsSequenceScanner::Codegen(this, conjunct_ctxs_); + status = HdfsSequenceScanner::Codegen(this, conjunct_ctxs_, &fn); break; case THdfsFileFormat::AVRO: - fn = HdfsAvroScanner::Codegen(this, conjunct_ctxs_); + status = HdfsAvroScanner::Codegen(this, conjunct_ctxs_, &fn); + break; + case THdfsFileFormat::PARQUET: + status = HdfsParquetScanner::Codegen(this, conjunct_ctxs_, &fn); break; default: // No codegen for this format fn = NULL; + status = Status("Not implemented for this format."); } - if (fn != NULL) { + DCHECK(fn != NULL || !status.ok()); + + const char* format_name = _THdfsFileFormat_VALUES_TO_NAMES.find(format)->second; + if (!status.ok()) { + AddCodegenExecOption(false, status, format_name); + } else { + AddCodegenExecOption(true, status, format_name); LlvmCodeGen* codegen; RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen)); codegen->AddFunctionToJit( http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 81cd1f0..b0a30d9 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -357,16 +357,21 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields, // eval_fail: ; preds = %parse // ret i1 false // } -Function* HdfsScanner::CodegenWriteCompleteTuple( - HdfsScanNode* node, LlvmCodeGen* codegen, const vector<ExprContext*>& conjunct_ctxs) { +Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNode* node, LlvmCodeGen* codegen, + const vector<ExprContext*>& conjunct_ctxs, Function** write_complete_tuple_fn) { + *write_complete_tuple_fn = NULL; SCOPED_TIMER(codegen->codegen_timer()); RuntimeState* state = node->runtime_state(); // TODO: Timestamp is not yet supported for (int i = 0; i < node->materialized_slots().size(); ++i) { SlotDescriptor* slot_desc = node->materialized_slots()[i]; - if (slot_desc->type().type == TYPE_TIMESTAMP) return NULL; - if (slot_desc->type().type == TYPE_DECIMAL) return NULL; + if (slot_desc->type().type == TYPE_TIMESTAMP) { + return Status("Timestamp not yet supported for codegen."); + } + if (slot_desc->type().type == TYPE_DECIMAL) { + return Status("Decimal not yet supported for codegen."); + } } // Cast away const-ness. The codegen only sets the cached typed llvm struct. @@ -377,7 +382,7 @@ Function* HdfsScanner::CodegenWriteCompleteTuple( Function* fn = TextConverter::CodegenWriteSlot(codegen, tuple_desc, slot_desc, node->hdfs_table()->null_column_value().data(), node->hdfs_table()->null_column_value().size(), true, state->strict_mode()); - if (fn == NULL) return NULL; + if (fn == NULL) return Status("CodegenWriteSlot failed."); slot_fns.push_back(fn); } @@ -409,7 +414,7 @@ Function* HdfsScanner::CodegenWriteCompleteTuple( // Generate the typed llvm struct for the output tuple StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen); - if (tuple_type == NULL) return NULL; + if (tuple_type == NULL) return Status("Could not generate tuple struct."); PointerType* tuple_ptr_type = PointerType::get(tuple_type, 0); // Initialize the function prototype. This needs to match @@ -533,7 +538,7 @@ Function* HdfsScanner::CodegenWriteCompleteTuple( ss << "Failed to codegen conjunct: " << status.GetDetail(); state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str())); fn->eraseFromParent(); - return NULL; + return status; } Function* get_ctx_fn = @@ -553,12 +558,16 @@ Function* HdfsScanner::CodegenWriteCompleteTuple( builder.SetInsertPoint(eval_fail_block); builder.CreateRet(codegen->false_value()); - codegen->FinalizeFunction(fn); - return codegen->FinalizeFunction(fn); + *write_complete_tuple_fn = codegen->FinalizeFunction(fn); + if (*write_complete_tuple_fn == NULL) { + return Status("Failed to finalize write_complete_tuple_fn."); + } + return Status::OK(); } -Function* HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNode* node, - LlvmCodeGen* codegen, Function* write_complete_tuple_fn) { +Status HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNode* node, LlvmCodeGen* codegen, + Function* write_complete_tuple_fn, Function** write_aligned_tuples_fn) { + *write_aligned_tuples_fn = NULL; SCOPED_TIMER(codegen->codegen_timer()); DCHECK(write_complete_tuple_fn != NULL); @@ -570,7 +579,11 @@ Function* HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNode* node, "WriteCompleteTuple"); DCHECK_EQ(replaced, 1); - return codegen->FinalizeFunction(write_tuples_fn); + *write_aligned_tuples_fn = codegen->FinalizeFunction(write_tuples_fn); + if (*write_aligned_tuples_fn == NULL) { + return Status("Failed to finalize write_aligned_tuples_fn."); + } + return Status::OK(); } Status HdfsScanner::UpdateDecompressor(const THdfsCompression::type& compression) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 7ddf0a5..b7bc89b 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -413,15 +413,18 @@ class HdfsScanner { uint8_t* error_in_row); /// Codegen function to replace WriteCompleteTuple. Should behave identically - /// to WriteCompleteTuple. - static llvm::Function* CodegenWriteCompleteTuple(HdfsScanNode*, LlvmCodeGen*, - const std::vector<ExprContext*>& conjunct_ctxs); - - /// Codegen function to replace WriteAlignedTuples. WriteAlignedTuples is cross compiled - /// to IR. This function loads the precompiled IR function, modifies it and returns the - /// resulting function. - static llvm::Function* CodegenWriteAlignedTuples(HdfsScanNode*, LlvmCodeGen*, - llvm::Function* write_tuple_fn); + /// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn' + /// if codegen was successful or NULL otherwise. + static Status CodegenWriteCompleteTuple(HdfsScanNode*, LlvmCodeGen*, + const std::vector<ExprContext*>& conjunct_ctxs, + llvm::Function** write_complete_tuple_fn); + + /// Codegen function to replace WriteAlignedTuples. WriteAlignedTuples is cross + /// compiled to IR. This function loads the precompiled IR function, modifies it, + /// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was + /// successful or NULL otherwise. + static Status CodegenWriteAlignedTuples(HdfsScanNode*, LlvmCodeGen*, + llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn); /// Report parse error for column @ desc. If abort_on_error is true, sets /// parse_status_ to the error message. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index 1bc0826..b5542f6 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -52,15 +52,22 @@ HdfsSequenceScanner::~HdfsSequenceScanner() { } // Codegen for materialized parsed data into tuples. -Function* HdfsSequenceScanner::Codegen(HdfsScanNode* node, - const vector<ExprContext*>& conjunct_ctxs) { - if (!node->runtime_state()->codegen_enabled()) return NULL; +Status HdfsSequenceScanner::Codegen(HdfsScanNode* node, + const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) { + *write_aligned_tuples_fn = NULL; + if (!node->runtime_state()->codegen_enabled()) { + return Status("Disabled by query option."); + } LlvmCodeGen* codegen; - if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL; - Function* write_complete_tuple_fn = - CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs); - if (write_complete_tuple_fn == NULL) return NULL; - return CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn); + RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen)); + Function* write_complete_tuple_fn; + RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs, + &write_complete_tuple_fn)); + DCHECK(write_complete_tuple_fn != NULL); + RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn, + write_aligned_tuples_fn)); + DCHECK(*write_aligned_tuples_fn != NULL); + return Status::OK(); } Status HdfsSequenceScanner::InitNewRange() { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-sequence-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h index 2663f27..1246b5a 100644 --- a/be/src/exec/hdfs-sequence-scanner.h +++ b/be/src/exec/hdfs-sequence-scanner.h @@ -170,9 +170,10 @@ class HdfsSequenceScanner : public BaseSequenceScanner { /// Implementation of HdfsScanner interface. virtual Status Open(ScannerContext* context); - /// Codegen writing tuples and evaluating predicates. - static llvm::Function* Codegen(HdfsScanNode*, - const std::vector<ExprContext*>& conjunct_ctxs); + /// Codegen WriteAlignedTuples(). Stores the resulting function in + /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise. + static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs, + llvm::Function** write_aligned_tuples_fn); protected: /// Implementation of sequence container super class methods. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index edf5f14..4be4b02 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -687,15 +687,22 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) { // Codegen for materializing parsed data into tuples. The function WriteCompleteTuple is // codegen'd using the IRBuilder for the specific tuple description. This function // is then injected into the cross-compiled driving function, WriteAlignedTuples(). -Function* HdfsTextScanner::Codegen(HdfsScanNode* node, - const vector<ExprContext*>& conjunct_ctxs) { - if (!node->runtime_state()->codegen_enabled()) return NULL; +Status HdfsTextScanner::Codegen(HdfsScanNode* node, + const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) { + *write_aligned_tuples_fn = NULL; + if (!node->runtime_state()->codegen_enabled()) { + return Status("Disabled by query option."); + } LlvmCodeGen* codegen; - if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL; - Function* write_complete_tuple_fn = - CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs); - if (write_complete_tuple_fn == NULL) return NULL; - return CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn); + RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen)); + Function* write_complete_tuple_fn; + RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs, + &write_complete_tuple_fn)); + DCHECK(write_complete_tuple_fn != NULL); + RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn, + write_aligned_tuples_fn)); + DCHECK(*write_aligned_tuples_fn != NULL); + return Status::OK(); } Status HdfsTextScanner::Open(ScannerContext* context) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-text-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h index 2b9027c..b355ac8 100644 --- a/be/src/exec/hdfs-text-scanner.h +++ b/be/src/exec/hdfs-text-scanner.h @@ -60,9 +60,10 @@ class HdfsTextScanner : public HdfsScanner { static Status IssueInitialRanges(HdfsScanNode* scan_node, const std::vector<HdfsFileDesc*>& files); - /// Codegen writing tuples and evaluating predicates. - static llvm::Function* Codegen(HdfsScanNode*, - const std::vector<ExprContext*>& conjunct_ctxs); + /// Codegen WriteAlignedTuples(). Stores the resulting function in + /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise. + static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs, + llvm::Function** write_aligned_tuples_fn); /// Suffix for lzo index files. const static std::string LZO_INDEX_SUFFIX;
