Repository: incubator-impala
Updated Branches:
  refs/heads/master c23bf38a2 -> 9e438a08f


IMPALA-3629: Codegen TransferScratchTuples() in hdfs-parquet-scanner

We currently don't do any codegen in the hdfs-parquet scanner,
which limits performance. This patch creates a new function,
ProcessScratchBatch, which contains the inner loop of
TransferScratchTuples, allowing us to cross-compile only this
performance critical part. It also uses CodegenEvalConjuncts to
replace the call to EvalConjuncts in ProcessScratchBatch with a
codegen'd version.

Additionally, it modifies the Codegen functions in
hdfs-avro/text/sequence-scanner to take an output parameter for
the codegen'd function and return a Status in order to improve
logging around codegen failures.

Change-Id: Ic327e437c7cd2b3f92cdb11c1e907bfee2d44ee8
Reviewed-on: http://gerrit.cloudera.org:8080/4093
Reviewed-by: Thomas Tauber-Marshall <[email protected]>
Reviewed-by: Matthew Jacobs <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9e438a08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9e438a08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9e438a08

Branch: refs/heads/master
Commit: 9e438a08f1fa5761a88f0c9329bfedaeae3206d1
Parents: c23bf38
Author: Thomas Tauber-Marshall <[email protected]>
Authored: Tue Jul 26 11:28:00 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Tue Aug 23 21:57:55 2016 +0000

----------------------------------------------------------------------
 be/src/codegen/gen_ir_descriptions.py  |  1 +
 be/src/codegen/impala-ir.cc            |  1 +
 be/src/exec/CMakeLists.txt             |  1 +
 be/src/exec/hdfs-avro-scanner.cc       | 67 +++++++++++---------
 be/src/exec/hdfs-avro-scanner.h        | 24 ++++---
 be/src/exec/hdfs-parquet-scanner-ir.cc | 65 +++++++++++++++++++
 be/src/exec/hdfs-parquet-scanner.cc    | 98 ++++++++++++++++-------------
 be/src/exec/hdfs-parquet-scanner.h     | 14 +++++
 be/src/exec/hdfs-scan-node.cc          | 19 ++++--
 be/src/exec/hdfs-scanner.cc            | 37 +++++++----
 be/src/exec/hdfs-scanner.h             | 21 ++++---
 be/src/exec/hdfs-sequence-scanner.cc   | 23 ++++---
 be/src/exec/hdfs-sequence-scanner.h    |  7 ++-
 be/src/exec/hdfs-text-scanner.cc       | 23 ++++---
 be/src/exec/hdfs-text-scanner.h        |  7 ++-
 15 files changed, 278 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py 
b/be/src/codegen/gen_ir_descriptions.py
index 6c6761f..9ba9d78 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -113,6 +113,7 @@ ir_functions = [
   ["READ_AVRO_DECIMAL", "ReadAvroDecimal"],
   ["HDFS_SCANNER_WRITE_ALIGNED_TUPLES", "WriteAlignedTuples"],
   ["HDFS_SCANNER_GET_CONJUNCT_CTX", "GetConjunctCtx"],
+  ["PROCESS_SCRATCH_BATCH", "ProcessScratchBatch"],
   ["STRING_TO_BOOL", "IrStringToBool"],
   ["STRING_TO_INT8", "IrStringToInt8"],
   ["STRING_TO_INT16", "IrStringToInt16"],

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/codegen/impala-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc
index 2221051..185d6c2 100644
--- a/be/src/codegen/impala-ir.cc
+++ b/be/src/codegen/impala-ir.cc
@@ -26,6 +26,7 @@
 #include "exec/hash-join-node-ir.cc"
 #include "exec/hash-table-ir.cc"
 #include "exec/hdfs-avro-scanner-ir.cc"
+#include "exec/hdfs-parquet-scanner-ir.cc"
 #include "exec/hdfs-scanner-ir.cc"
 #include "exec/partitioned-aggregation-node-ir.cc"
 #include "exec/partitioned-hash-join-node-ir.cc"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index afb1a59..091b9b7 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -60,6 +60,7 @@ add_library(Exec
   hdfs-text-table-writer.cc
   hdfs-sequence-table-writer.cc
   hdfs-parquet-scanner.cc
+  hdfs-parquet-scanner-ir.cc
   hdfs-parquet-table-writer.cc
   hbase-scan-node.cc
   hbase-table-scanner.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-avro-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index 1c31718..ff278de 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -78,14 +78,21 @@ Status HdfsAvroScanner::Open(ScannerContext* context) {
   return Status::OK();
 }
 
-Function* HdfsAvroScanner::Codegen(HdfsScanNode* node,
-                                   const vector<ExprContext*>& conjunct_ctxs) {
-  if (!node->runtime_state()->codegen_enabled()) return NULL;
+Status HdfsAvroScanner::Codegen(HdfsScanNode* node,
+    const vector<ExprContext*>& conjunct_ctxs, Function** decode_avro_data_fn) 
{
+  *decode_avro_data_fn = NULL;
+  if (!node->runtime_state()->codegen_enabled()) {
+    return Status("Disabled by query option.");
+  }
   LlvmCodeGen* codegen;
-  if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL;
-  Function* materialize_tuple_fn = CodegenMaterializeTuple(node, codegen);
-  if (materialize_tuple_fn == NULL) return NULL;
-  return CodegenDecodeAvroData(node->runtime_state(), materialize_tuple_fn, 
conjunct_ctxs);
+  RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen));
+  Function* materialize_tuple_fn;
+  RETURN_IF_ERROR(CodegenMaterializeTuple(node, codegen, 
&materialize_tuple_fn));
+  DCHECK(materialize_tuple_fn != NULL);
+  RETURN_IF_ERROR(CodegenDecodeAvroData(node->runtime_state(), 
materialize_tuple_fn,
+      conjunct_ctxs, decode_avro_data_fn));
+  DCHECK(*decode_avro_data_fn != NULL);
+  return Status::OK();
 }
 
 BaseSequenceScanner::FileHeader* HdfsAvroScanner::AllocateFileHeader() {
@@ -765,8 +772,8 @@ void 
HdfsAvroScanner::SetStatusValueOverflow(TErrorCode::type error_code, int64_
 // bail_out:           ; preds = %read_field11, %end_field3, %read_field2, 
%end_field,
 //   ret i1 false      ;         %read_field, %entry
 // }
-Function* HdfsAvroScanner::CodegenMaterializeTuple(
-    HdfsScanNode* node, LlvmCodeGen* codegen) {
+Status HdfsAvroScanner::CodegenMaterializeTuple(HdfsScanNode* node, 
LlvmCodeGen* codegen,
+    Function** materialize_tuple_fn) {
   LLVMContext& context = codegen->context();
   LlvmCodeGen::LlvmBuilder builder(context);
 
@@ -776,7 +783,7 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple(
 
   TupleDescriptor* tuple_desc = 
const_cast<TupleDescriptor*>(node->tuple_desc());
   StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen);
-  if (tuple_type == NULL) return NULL; // Could not generate tuple struct
+  if (tuple_type == NULL) return Status("Could not generate tuple struct.");
   Type* tuple_ptr_type = PointerType::get(tuple_type, 0);
 
   Type* tuple_opaque_type = codegen->GetType(Tuple::LLVM_CLASS_NAME);
@@ -814,7 +821,7 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple(
   if (!status.ok()) {
     VLOG_QUERY << status.GetDetail();
     fn->eraseFromParent();
-    return NULL;
+    return status;
   }
 
   // Returns true on successful decoding.
@@ -824,7 +831,11 @@ Function* HdfsAvroScanner::CodegenMaterializeTuple(
   builder.SetInsertPoint(bail_out_block);
   builder.CreateRet(codegen->false_value());
 
-  return codegen->FinalizeFunction(fn);
+  *materialize_tuple_fn = codegen->FinalizeFunction(fn);
+  if (*materialize_tuple_fn == NULL) {
+    return Status("Failed to finalize materialize_tuple_fn.");
+  }
+  return Status::OK();
 }
 
 Status HdfsAvroScanner::CodegenReadRecord(
@@ -1005,32 +1016,30 @@ Status HdfsAvroScanner::CodegenReadScalar(const 
AvroSchemaElement& element,
   return Status::OK();
 }
 
-// TODO: return Status
-Function* HdfsAvroScanner::CodegenDecodeAvroData(RuntimeState* state,
-    Function* materialize_tuple_fn, const vector<ExprContext*>& conjunct_ctxs) 
{
+Status HdfsAvroScanner::CodegenDecodeAvroData(RuntimeState* state,
+    Function* materialize_tuple_fn, const vector<ExprContext*>& conjunct_ctxs,
+    Function** decode_avro_data_fn) {
   LlvmCodeGen* codegen;
-  if (!state->GetCodegen(&codegen).ok()) return NULL;
+  RETURN_IF_ERROR(state->GetCodegen(&codegen));
   SCOPED_TIMER(codegen->codegen_timer());
   DCHECK(materialize_tuple_fn != NULL);
 
-  Function* decode_avro_data_fn =
-      codegen->GetFunction(IRFunction::DECODE_AVRO_DATA, true);
+  Function* fn = codegen->GetFunction(IRFunction::DECODE_AVRO_DATA, true);
 
-  int replaced = codegen->ReplaceCallSites(decode_avro_data_fn, 
materialize_tuple_fn,
-      "MaterializeTuple");
+  int replaced = codegen->ReplaceCallSites(fn, materialize_tuple_fn, 
"MaterializeTuple");
   DCHECK_EQ(replaced, 1);
 
   Function* eval_conjuncts_fn;
-  Status status =
-      ExecNode::CodegenEvalConjuncts(state, conjunct_ctxs, &eval_conjuncts_fn);
-  if (!status.ok()) return NULL;
+  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(state, conjunct_ctxs,
+      &eval_conjuncts_fn));
 
-  replaced = codegen->ReplaceCallSites(decode_avro_data_fn, eval_conjuncts_fn,
-      "EvalConjuncts");
+  replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, "EvalConjuncts");
   DCHECK_EQ(replaced, 1);
 
-  decode_avro_data_fn->setName("DecodeAvroData");
-  decode_avro_data_fn = codegen->FinalizeFunction(decode_avro_data_fn);
-  DCHECK(decode_avro_data_fn != NULL);
-  return decode_avro_data_fn;
+  fn->setName("DecodeAvroData");
+  *decode_avro_data_fn = codegen->FinalizeFunction(fn);
+  if (*decode_avro_data_fn == NULL) {
+    return Status("Failed to finalize decode_avro_data_fn.");
+  }
+  return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-avro-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h
index fbb519f..f95cd10 100644
--- a/be/src/exec/hdfs-avro-scanner.h
+++ b/be/src/exec/hdfs-avro-scanner.h
@@ -93,9 +93,10 @@ class HdfsAvroScanner : public BaseSequenceScanner {
 
   virtual Status Open(ScannerContext* context);
 
-  /// Codegen parsing records, writing tuples and evaluating predicates.
-  static llvm::Function* Codegen(HdfsScanNode*,
-                                 const std::vector<ExprContext*>& 
conjunct_ctxs);
+  /// Codegen DecodeAvroData(). Stores the resulting function in 
'decode_avro_data_fn' if
+  /// codegen was successful or NULL otherwise.
+  static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& 
conjunct_ctxs,
+      llvm::Function** decode_avro_data_fn);
 
  protected:
   /// Implementation of BaseSeqeunceScanner super class methods
@@ -193,16 +194,19 @@ class HdfsAvroScanner : public BaseSequenceScanner {
       uint8_t** data, uint8_t* data_end, Tuple* tuple);
 
   /// Produces a version of DecodeAvroData that uses codegen'd instead of 
interpreted
-  /// functions.
-  static llvm::Function* CodegenDecodeAvroData(
-      RuntimeState* state, llvm::Function* materialize_tuple_fn,
-      const std::vector<ExprContext*>& conjunct_ctxs);
+  /// functions. Stores the resulting function in 'decode_avro_data_fn' if 
codegen was
+  /// successful or returns an error.
+  static Status CodegenDecodeAvroData(RuntimeState* state,
+      llvm::Function* materialize_tuple_fn,
+      const std::vector<ExprContext*>& conjunct_ctxs,
+      llvm::Function** decode_avro_data_fn);
 
   /// Codegens a version of MaterializeTuple() that reads records based on the 
table
-  /// schema.
+  /// schema. Stores the resulting function in 'materialize_tuple_fn' if 
codegen was
+  /// successful or returns an error.
   /// TODO: Codegen a function for each unique file schema.
-  static llvm::Function* CodegenMaterializeTuple(HdfsScanNode* node,
-      LlvmCodeGen* codegen);
+  static Status CodegenMaterializeTuple(HdfsScanNode* node, LlvmCodeGen* 
codegen,
+      llvm::Function** materialize_tuple_fn);
 
   /// Used by CodegenMaterializeTuple to recursively create the IR for reading 
an Avro
   /// record.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-parquet-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-ir.cc 
b/be/src/exec/hdfs-parquet-scanner-ir.cc
new file mode 100644
index 0000000..1cd5252
--- /dev/null
+++ b/be/src/exec/hdfs-parquet-scanner-ir.cc
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs-parquet-scanner.h"
+
+#include "exec/parquet-scratch-tuple-batch.h"
+#include "runtime/tuple-row.h"
+
+using namespace impala;
+
+int HdfsParquetScanner::ProcessScratchBatch(RowBatch* dst_batch) {
+  const bool has_filters = !filter_ctxs_.empty();
+  ExprContext* const* conjunct_ctxs = &(*scanner_conjunct_ctxs_)[0];
+  const int num_conjuncts = scanner_conjunct_ctxs_->size();
+
+  // Start/end/current iterators over the output rows.
+  Tuple** output_row_start =
+      reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows()));
+  Tuple** output_row_end =
+      output_row_start + (dst_batch->capacity() - dst_batch->num_rows());
+  Tuple** output_row = output_row_start;
+
+  // Start/end/current iterators over the scratch tuples.
+  uint8_t* scratch_tuple_start = scratch_batch_->CurrTuple();
+  uint8_t* scratch_tuple_end = scratch_batch_->TupleEnd();
+  uint8_t* scratch_tuple = scratch_tuple_start;
+  const int tuple_size = scratch_batch_->tuple_byte_size;
+
+  // Loop until the scratch batch is exhausted or the output batch is full.
+  // Do not use batch_->AtCapacity() in this loop because it is not necessary
+  // to perform the memory capacity check.
+  while (scratch_tuple != scratch_tuple_end) {
+    *output_row = reinterpret_cast<Tuple*>(scratch_tuple);
+    scratch_tuple += tuple_size;
+    // Evaluate runtime filters and conjuncts. Short-circuit the evaluation if
+    // the filters/conjuncts are empty to avoid function calls.
+    if (has_filters && 
!EvalRuntimeFilters(reinterpret_cast<TupleRow*>(output_row))) {
+      continue;
+    }
+    if (!ExecNode::EvalConjuncts(conjunct_ctxs, num_conjuncts,
+        reinterpret_cast<TupleRow*>(output_row))) {
+      continue;
+    }
+    // Row survived runtime filters and conjuncts.
+    ++output_row;
+    if (output_row == output_row_end) break;
+  }
+  scratch_batch_->tuple_idx += (scratch_tuple - scratch_tuple_start) / 
tuple_size;
+
+  return output_row - output_row_start;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 2aaf8de..30c07a4 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -23,6 +23,7 @@
 #include <gflags/gflags.h>
 #include <gutil/strings/substitute.h>
 
+#include "codegen/llvm-codegen.h"
 #include "common/logging.h"
 #include "exec/hdfs-scanner.h"
 #include "exec/hdfs-scan-node.h"
@@ -45,6 +46,7 @@
 
 #include "common/names.h"
 
+using llvm::Function;
 using strings::Substitute;
 using namespace impala;
 
@@ -151,7 +153,8 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNode* 
scan_node, RuntimeState* st
       dictionary_pool_(new MemPool(scan_node->mem_tracker())),
       assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
       num_cols_counter_(NULL),
-      num_row_groups_counter_(NULL) {
+      num_row_groups_counter_(NULL),
+      codegend_process_scratch_batch_fn_(NULL) {
   assemble_rows_timer_.Stop();
 }
 
@@ -164,7 +167,13 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   num_row_groups_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumRowGroups", TUnit::UNIT);
 
-  scan_node_->IncNumScannersCodegenDisabled();
+  codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>(
+      scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET));
+  if (codegend_process_scratch_batch_fn_ == NULL) {
+    scan_node_->IncNumScannersCodegenDisabled();
+  } else {
+    scan_node_->IncNumScannersCodegenEnabled();
+  }
 
   level_cache_pool_.reset(new MemPool(scan_node_->mem_tracker()));
 
@@ -562,33 +571,17 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* 
dst_batch) {
   // we always call CommitRows() after TransferScratchTuples(), the output 
batch can
   // never be empty.
   DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
-
-  const bool has_filters = !filter_ctxs_.empty();
-  const bool has_conjuncts = !scanner_conjunct_ctxs_->empty();
-  ExprContext* const* conjunct_ctxs = &(*scanner_conjunct_ctxs_)[0];
-  const int num_conjuncts = scanner_conjunct_ctxs_->size();
-
-  // Start/end/current iterators over the output rows.
   DCHECK_EQ(scan_node_->tuple_idx(), 0);
   DCHECK_EQ(dst_batch->row_desc().tuple_descriptors().size(), 1);
-  Tuple** output_row_start =
-      reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows()));
-  Tuple** output_row_end =
-      output_row_start + (dst_batch->capacity() - dst_batch->num_rows());
-  Tuple** output_row = output_row_start;
-
-  // Start/end/current iterators over the scratch tuples.
-  uint8_t* scratch_tuple_start = scratch_batch_->CurrTuple();
-  uint8_t* scratch_tuple_end = scratch_batch_->TupleEnd();
-  uint8_t* scratch_tuple = scratch_tuple_start;
-  const int tuple_size = scratch_batch_->tuple_byte_size;
-
-  if (tuple_size == 0) {
+
+  if (scratch_batch_->tuple_byte_size == 0) {
+    Tuple** output_row =
+        reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows()));
     // We are materializing a collection with empty tuples. Add a NULL tuple 
to the
     // output batch per remaining scratch tuple and return. No need to evaluate
     // filters/conjuncts.
-    DCHECK(!has_filters);
-    DCHECK(!has_conjuncts);
+    DCHECK(filter_ctxs_.empty());
+    DCHECK(scanner_conjunct_ctxs_->empty());
     int num_tuples = min(dst_batch->capacity() - dst_batch->num_rows(),
         scratch_batch_->num_tuples - scratch_batch_->tuple_idx);
     memset(output_row, 0, num_tuples * sizeof(Tuple*));
@@ -599,26 +592,12 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* 
dst_batch) {
     return num_tuples;
   }
 
-  // Loop until the scratch batch is exhausted or the output batch is full.
-  // Do not use batch_->AtCapacity() in this loop because it is not necessary
-  // to perform the memory capacity check.
-  while (scratch_tuple != scratch_tuple_end) {
-    *output_row = reinterpret_cast<Tuple*>(scratch_tuple);
-    scratch_tuple += tuple_size;
-    // Evaluate runtime filters and conjuncts. Short-circuit the evaluation if
-    // the filters/conjuncts are empty to avoid function calls.
-    if (has_filters && 
!EvalRuntimeFilters(reinterpret_cast<TupleRow*>(output_row))) {
-      continue;
-    }
-    if (has_conjuncts && !ExecNode::EvalConjuncts(
-        conjunct_ctxs, num_conjuncts, 
reinterpret_cast<TupleRow*>(output_row))) {
-      continue;
-    }
-    // Row survived runtime filters and conjuncts.
-    ++output_row;
-    if (output_row == output_row_end) break;
+  int num_row_to_commit;
+  if (codegend_process_scratch_batch_fn_ != NULL) {
+    num_row_to_commit = codegend_process_scratch_batch_fn_(this, dst_batch);
+  } else {
+    num_row_to_commit = ProcessScratchBatch(dst_batch);
   }
-  scratch_batch_->tuple_idx += (scratch_tuple - scratch_tuple_start) / 
tuple_size;
 
   // TODO: Consider compacting the output row batch to better handle cases 
where
   // there are few surviving tuples per scratch batch. In such cases, we could
@@ -627,7 +606,38 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* 
dst_batch) {
   if (scratch_batch_->AtEnd()) {
     dst_batch->tuple_data_pool()->AcquireData(scratch_batch_->mem_pool(), 
false);
   }
-  return output_row - output_row_start;
+  return num_row_to_commit;
+}
+
+Status HdfsParquetScanner::Codegen(HdfsScanNode* node,
+    const vector<ExprContext*>& conjunct_ctxs, Function** 
process_scratch_batch_fn) {
+  *process_scratch_batch_fn = NULL;
+  if (!node->runtime_state()->codegen_enabled()) {
+    return Status("Disabled by query option.");
+  }
+
+  LlvmCodeGen* codegen;
+  RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen));
+  DCHECK(codegen != NULL);
+  SCOPED_TIMER(codegen->codegen_timer());
+
+  Function* fn = codegen->GetFunction(IRFunction::PROCESS_SCRATCH_BATCH, true);
+  DCHECK(fn != NULL);
+
+  Function* eval_conjuncts_fn;
+  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(node->runtime_state(), 
conjunct_ctxs,
+      &eval_conjuncts_fn));
+  DCHECK(eval_conjuncts_fn != NULL);
+
+  int replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, 
"EvalConjuncts");
+  DCHECK_EQ(replaced, 1);
+
+  fn->setName("ProcessScratchBatch");
+  *process_scratch_batch_fn = codegen->FinalizeFunction(fn);
+  if (*process_scratch_batch_fn == NULL) {
+    return Status("Failed to finalize process_scratch_batch_fn.");
+  }
+  return Status::OK();
 }
 
 bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index dfd7785..425444f 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -333,6 +333,11 @@ class HdfsParquetScanner : public HdfsScanner {
   virtual Status ProcessSplit();
   virtual void Close(RowBatch* row_batch);
 
+  /// Codegen ProcessScratchBatch(). Stores the resulting function in
+  /// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise.
+  static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& 
conjunct_ctxs,
+      llvm::Function** process_scratch_batch_fn);
+
   /// The repetition level is set to this value to indicate the end of a row 
group.
   static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
   /// Indicates an invalid definition or repetition level.
@@ -427,6 +432,10 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Number of row groups that need to be read.
   RuntimeProfile::Counter* num_row_groups_counter_;
 
+  typedef int (*ProcessScratchBatchFn)(HdfsParquetScanner*, RowBatch*);
+  /// The codegen'd version of ProcessScratchBatch() if available, NULL 
otherwise.
+  ProcessScratchBatchFn codegend_process_scratch_batch_fn_;
+
   const char* filename() const { return metadata_range_->file(); }
 
   virtual Status GetNextInternal(RowBatch* row_batch, bool* eos);
@@ -464,6 +473,11 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Returns the number of rows that should be committed to the given batch.
   int TransferScratchTuples(RowBatch* dst_batch);
 
+  /// Processes a single row batch for TransferScratchTuples, looping over 
scratch_batch_
+  /// until it is exhausted or the output is full. Called for the case when 
there are
+  /// materialized tuples. This is a separate function so it can be codegened.
+  int ProcessScratchBatch(RowBatch* dst_batch);
+
   /// Evaluates runtime filters (if any) against the given row. Returns true if
   /// they passed, false otherwise. Maintains the runtime filter stats, 
determines
   /// whether the filters are effective, and disables them if they are not.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 6077f99..9ed78de 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -672,21 +672,32 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
     // Create reusable codegen'd functions for each file type type needed
     // TODO: do this for conjuncts_map_
     Function* fn;
+    Status status;
     switch (format) {
       case THdfsFileFormat::TEXT:
-        fn = HdfsTextScanner::Codegen(this, conjunct_ctxs_);
+        status = HdfsTextScanner::Codegen(this, conjunct_ctxs_, &fn);
         break;
       case THdfsFileFormat::SEQUENCE_FILE:
-        fn = HdfsSequenceScanner::Codegen(this, conjunct_ctxs_);
+        status = HdfsSequenceScanner::Codegen(this, conjunct_ctxs_, &fn);
         break;
       case THdfsFileFormat::AVRO:
-        fn = HdfsAvroScanner::Codegen(this, conjunct_ctxs_);
+        status = HdfsAvroScanner::Codegen(this, conjunct_ctxs_, &fn);
+        break;
+      case THdfsFileFormat::PARQUET:
+        status = HdfsParquetScanner::Codegen(this, conjunct_ctxs_, &fn);
         break;
       default:
         // No codegen for this format
         fn = NULL;
+        status = Status("Not implemented for this format.");
     }
-    if (fn != NULL) {
+    DCHECK(fn != NULL || !status.ok());
+
+    const char* format_name = 
_THdfsFileFormat_VALUES_TO_NAMES.find(format)->second;
+    if (!status.ok()) {
+      AddCodegenExecOption(false, status, format_name);
+    } else {
+      AddCodegenExecOption(true, status, format_name);
       LlvmCodeGen* codegen;
       RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen));
       codegen->AddFunctionToJit(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 81cd1f0..b0a30d9 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -357,16 +357,21 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, 
FieldLocation* fields,
 // eval_fail:                                        ; preds = %parse
 //   ret i1 false
 // }
-Function* HdfsScanner::CodegenWriteCompleteTuple(
-    HdfsScanNode* node, LlvmCodeGen* codegen, const vector<ExprContext*>& 
conjunct_ctxs) {
+Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNode* node, LlvmCodeGen* 
codegen,
+    const vector<ExprContext*>& conjunct_ctxs, Function** 
write_complete_tuple_fn) {
+  *write_complete_tuple_fn = NULL;
   SCOPED_TIMER(codegen->codegen_timer());
   RuntimeState* state = node->runtime_state();
 
   // TODO: Timestamp is not yet supported
   for (int i = 0; i < node->materialized_slots().size(); ++i) {
     SlotDescriptor* slot_desc = node->materialized_slots()[i];
-    if (slot_desc->type().type == TYPE_TIMESTAMP) return NULL;
-    if (slot_desc->type().type == TYPE_DECIMAL) return NULL;
+    if (slot_desc->type().type == TYPE_TIMESTAMP) {
+      return Status("Timestamp not yet supported for codegen.");
+    }
+    if (slot_desc->type().type == TYPE_DECIMAL) {
+      return Status("Decimal not yet supported for codegen.");
+    }
   }
 
   // Cast away const-ness.  The codegen only sets the cached typed llvm struct.
@@ -377,7 +382,7 @@ Function* HdfsScanner::CodegenWriteCompleteTuple(
     Function* fn = TextConverter::CodegenWriteSlot(codegen, tuple_desc, 
slot_desc,
         node->hdfs_table()->null_column_value().data(),
         node->hdfs_table()->null_column_value().size(), true, 
state->strict_mode());
-    if (fn == NULL) return NULL;
+    if (fn == NULL) return Status("CodegenWriteSlot failed.");
     slot_fns.push_back(fn);
   }
 
@@ -409,7 +414,7 @@ Function* HdfsScanner::CodegenWriteCompleteTuple(
 
   // Generate the typed llvm struct for the output tuple
   StructType* tuple_type = tuple_desc->GetLlvmStruct(codegen);
-  if (tuple_type == NULL) return NULL;
+  if (tuple_type == NULL) return Status("Could not generate tuple struct.");
   PointerType* tuple_ptr_type = PointerType::get(tuple_type, 0);
 
   // Initialize the function prototype.  This needs to match
@@ -533,7 +538,7 @@ Function* HdfsScanner::CodegenWriteCompleteTuple(
         ss << "Failed to codegen conjunct: " << status.GetDetail();
         state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
         fn->eraseFromParent();
-        return NULL;
+        return status;
       }
 
       Function* get_ctx_fn =
@@ -553,12 +558,16 @@ Function* HdfsScanner::CodegenWriteCompleteTuple(
   builder.SetInsertPoint(eval_fail_block);
   builder.CreateRet(codegen->false_value());
 
-  codegen->FinalizeFunction(fn);
-  return codegen->FinalizeFunction(fn);
+  *write_complete_tuple_fn = codegen->FinalizeFunction(fn);
+  if (*write_complete_tuple_fn == NULL) {
+    return Status("Failed to finalize write_complete_tuple_fn.");
+  }
+  return Status::OK();
 }
 
-Function* HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNode* node,
-    LlvmCodeGen* codegen, Function* write_complete_tuple_fn) {
+Status HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNode* node, LlvmCodeGen* 
codegen,
+    Function* write_complete_tuple_fn, Function** write_aligned_tuples_fn) {
+  *write_aligned_tuples_fn = NULL;
   SCOPED_TIMER(codegen->codegen_timer());
   DCHECK(write_complete_tuple_fn != NULL);
 
@@ -570,7 +579,11 @@ Function* 
HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNode* node,
       "WriteCompleteTuple");
   DCHECK_EQ(replaced, 1);
 
-  return codegen->FinalizeFunction(write_tuples_fn);
+  *write_aligned_tuples_fn = codegen->FinalizeFunction(write_tuples_fn);
+  if (*write_aligned_tuples_fn == NULL) {
+    return Status("Failed to finalize write_aligned_tuples_fn.");
+  }
+  return Status::OK();
 }
 
 Status HdfsScanner::UpdateDecompressor(const THdfsCompression::type& 
compression) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 7ddf0a5..b7bc89b 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -413,15 +413,18 @@ class HdfsScanner {
       uint8_t* error_in_row);
 
   /// Codegen function to replace WriteCompleteTuple. Should behave identically
-  /// to WriteCompleteTuple.
-  static llvm::Function* CodegenWriteCompleteTuple(HdfsScanNode*, LlvmCodeGen*,
-      const std::vector<ExprContext*>& conjunct_ctxs);
-
-  /// Codegen function to replace WriteAlignedTuples.  WriteAlignedTuples is 
cross compiled
-  /// to IR.  This function loads the precompiled IR function, modifies it and 
returns the
-  /// resulting function.
-  static llvm::Function* CodegenWriteAlignedTuples(HdfsScanNode*, LlvmCodeGen*,
-      llvm::Function* write_tuple_fn);
+  /// to WriteCompleteTuple. Stores the resulting function in 
'write_complete_tuple_fn'
+  /// if codegen was successful or NULL otherwise.
+  static Status CodegenWriteCompleteTuple(HdfsScanNode*, LlvmCodeGen*,
+      const std::vector<ExprContext*>& conjunct_ctxs,
+      llvm::Function** write_complete_tuple_fn);
+
+  /// Codegen function to replace WriteAlignedTuples.  WriteAlignedTuples is 
cross
+  /// compiled to IR.  This function loads the precompiled IR function, 
modifies it,
+  /// and stores the resulting function in 'write_aligned_tuples_fn' if 
codegen was
+  /// successful or NULL otherwise.
+  static Status CodegenWriteAlignedTuples(HdfsScanNode*, LlvmCodeGen*,
+      llvm::Function* write_tuple_fn, llvm::Function** 
write_aligned_tuples_fn);
 
   /// Report parse error for column @ desc.   If abort_on_error is true, sets
   /// parse_status_ to the error message.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc 
b/be/src/exec/hdfs-sequence-scanner.cc
index 1bc0826..b5542f6 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -52,15 +52,22 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
 }
 
 // Codegen for materialized parsed data into tuples.
-Function* HdfsSequenceScanner::Codegen(HdfsScanNode* node,
-    const vector<ExprContext*>& conjunct_ctxs) {
-  if (!node->runtime_state()->codegen_enabled()) return NULL;
+Status HdfsSequenceScanner::Codegen(HdfsScanNode* node,
+    const vector<ExprContext*>& conjunct_ctxs, Function** 
write_aligned_tuples_fn) {
+  *write_aligned_tuples_fn = NULL;
+  if (!node->runtime_state()->codegen_enabled()) {
+    return Status("Disabled by query option.");
+  }
   LlvmCodeGen* codegen;
-  if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL;
-  Function* write_complete_tuple_fn =
-      CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs);
-  if (write_complete_tuple_fn == NULL) return NULL;
-  return CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn);
+  RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen));
+  Function* write_complete_tuple_fn;
+  RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs,
+      &write_complete_tuple_fn));
+  DCHECK(write_complete_tuple_fn != NULL);
+  RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, 
write_complete_tuple_fn,
+      write_aligned_tuples_fn));
+  DCHECK(*write_aligned_tuples_fn != NULL);
+  return Status::OK();
 }
 
 Status HdfsSequenceScanner::InitNewRange() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h 
b/be/src/exec/hdfs-sequence-scanner.h
index 2663f27..1246b5a 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -170,9 +170,10 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
   /// Implementation of HdfsScanner interface.
   virtual Status Open(ScannerContext* context);
 
-  /// Codegen writing tuples and evaluating predicates.
-  static llvm::Function* Codegen(HdfsScanNode*,
-                                 const std::vector<ExprContext*>& 
conjunct_ctxs);
+  /// Codegen WriteAlignedTuples(). Stores the resulting function in
+  /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise.
+  static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& 
conjunct_ctxs,
+      llvm::Function** write_aligned_tuples_fn);
 
  protected:
   /// Implementation of sequence container super class methods.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index edf5f14..4be4b02 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -687,15 +687,22 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* 
split_delimiter) {
 // Codegen for materializing parsed data into tuples.  The function 
WriteCompleteTuple is
 // codegen'd using the IRBuilder for the specific tuple description.  This 
function
 // is then injected into the cross-compiled driving function, 
WriteAlignedTuples().
-Function* HdfsTextScanner::Codegen(HdfsScanNode* node,
-    const vector<ExprContext*>& conjunct_ctxs) {
-  if (!node->runtime_state()->codegen_enabled()) return NULL;
+Status HdfsTextScanner::Codegen(HdfsScanNode* node,
+    const vector<ExprContext*>& conjunct_ctxs, Function** 
write_aligned_tuples_fn) {
+  *write_aligned_tuples_fn = NULL;
+  if (!node->runtime_state()->codegen_enabled()) {
+    return Status("Disabled by query option.");
+  }
   LlvmCodeGen* codegen;
-  if (!node->runtime_state()->GetCodegen(&codegen).ok()) return NULL;
-  Function* write_complete_tuple_fn =
-      CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs);
-  if (write_complete_tuple_fn == NULL) return NULL;
-  return CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn);
+  RETURN_IF_ERROR(node->runtime_state()->GetCodegen(&codegen));
+  Function* write_complete_tuple_fn;
+  RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs,
+      &write_complete_tuple_fn));
+  DCHECK(write_complete_tuple_fn != NULL);
+  RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, 
write_complete_tuple_fn,
+      write_aligned_tuples_fn));
+  DCHECK(*write_aligned_tuples_fn != NULL);
+  return Status::OK();
 }
 
 Status HdfsTextScanner::Open(ScannerContext* context) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9e438a08/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 2b9027c..b355ac8 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -60,9 +60,10 @@ class HdfsTextScanner : public HdfsScanner {
   static Status IssueInitialRanges(HdfsScanNode* scan_node,
                                    const std::vector<HdfsFileDesc*>& files);
 
-  /// Codegen writing tuples and evaluating predicates.
-  static llvm::Function* Codegen(HdfsScanNode*,
-                                 const std::vector<ExprContext*>& 
conjunct_ctxs);
+  /// Codegen WriteAlignedTuples(). Stores the resulting function in
+  /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise.
+  static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& 
conjunct_ctxs,
+      llvm::Function** write_aligned_tuples_fn);
 
   /// Suffix for lzo index files.
   const static std::string LZO_INDEX_SUFFIX;


Reply via email to