IMPALA-5717: Support for reading ORC data files

This patch integrates the orc library into Impala and implements
HdfsOrcScanner as a middle layer between them. The HdfsOrcScanner
supplies input needed from the orc-reader, tracks memory consumption of
the reader and transfers the reader's output (orc::ColumnVectorBatch)
into impala::RowBatch. The ORC version we used is release-1.4.3.

A startup option --enable_orc_scanner is added for this feature. It's
set to true by default. Setting it to false will fail queries on ORC
tables.

Currently, we only support reading primitive types. Writing into ORC
table has not been supported neither.

Tests
 - Most of the end-to-end tests can run on ORC format.
 - Add tpcds, tpch tests for ORC.
 - Add some ORC specific tests.
 - Haven't enabled test_scanner_fuzz for ORC yet, since the ORC library
   is not robust for corrupt files (ORC-315).

Change-Id: Ia7b6ae4ce3b9ee8125b21993702faa87537790a4
Reviewed-on: http://gerrit.cloudera.org:8080/9134
Reviewed-by: Quanlong Huang <huangquanl...@gmail.com>
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/9988


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/23743baa
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/23743baa
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/23743baa

Branch: refs/heads/2.x
Commit: 23743baa87863037fd21fffbaf031990aef131a7
Parents: 83422a3
Author: stiga-huang <huangquanl...@gmail.com>
Authored: Thu Jan 25 06:39:25 2018 -0800
Committer: Joe McDonnell <joemcdonn...@cloudera.com>
Committed: Fri Apr 13 03:25:23 2018 +0000

----------------------------------------------------------------------
 CMakeLists.txt                                  |   5 +
 be/CMakeLists.txt                               |   2 +
 be/src/codegen/gen_ir_descriptions.py           |   4 +-
 be/src/exec/CMakeLists.txt                      |   1 +
 be/src/exec/hdfs-orc-scanner.cc                 | 763 +++++++++++++++++++
 be/src/exec/hdfs-orc-scanner.h                  | 224 ++++++
 be/src/exec/hdfs-parquet-scanner-ir.cc          |  14 -
 be/src/exec/hdfs-parquet-scanner.cc             | 185 +----
 be/src/exec/hdfs-parquet-scanner.h              |  65 --
 be/src/exec/hdfs-scan-node-base.cc              |   6 +
 be/src/exec/hdfs-scan-node-mt.cc                |   1 +
 be/src/exec/hdfs-scanner-ir.cc                  |  14 +
 be/src/exec/hdfs-scanner.cc                     | 179 +++++
 be/src/exec/hdfs-scanner.h                      |  76 ++
 be/src/util/backend-gflag-util.cc               |   2 +
 bin/bootstrap_toolchain.py                      |   2 +-
 bin/impala-config.sh                            |   2 +
 cmake_modules/FindOrc.cmake                     |  55 ++
 common/thrift/BackendGflags.thrift              |   2 +
 common/thrift/CatalogObjects.thrift             |   6 +-
 fe/src/main/cup/sql-parser.cup                  |   8 +-
 .../impala/analysis/ComputeStatsStmt.java       |   8 +-
 .../apache/impala/catalog/HdfsFileFormat.java   |  38 +-
 .../impala/catalog/HdfsStorageDescriptor.java   |   1 +
 .../org/apache/impala/planner/HdfsScanNode.java |   8 +-
 .../apache/impala/service/BackendConfig.java    |   2 +
 .../org/apache/impala/service/Frontend.java     |   4 +-
 fe/src/main/jflex/sql-scanner.flex              |   1 +
 testdata/LineItemMultiBlock/README.dox          |  19 +
 .../lineitem_orc_multiblock_one_stripe.orc      | Bin 0 -> 424277 bytes
 .../LineItemMultiBlock/lineitem_sixblocks.orc   | Bin 0 -> 863581 bytes
 .../LineItemMultiBlock/lineitem_threeblocks.orc | Bin 0 -> 465929 bytes
 testdata/bin/create-load-data.sh                |   3 +
 testdata/bin/generate-schema-statements.py      |   7 +-
 testdata/bin/run-hive-server.sh                 |   5 +-
 .../common/etc/hadoop/conf/hdfs-site.xml.tmpl   |   6 +
 testdata/data/chars-formats.orc                 | Bin 0 -> 1411 bytes
 .../functional/functional_schema_template.sql   |   2 +
 .../datasets/functional/schema_constraints.csv  |   3 +
 .../PlannerTest/complex-types-file-formats.test |  57 +-
 .../functional-query/functional-query_core.csv  |   1 +
 .../functional-query_dimensions.csv             |   2 +-
 .../functional-query_exhaustive.csv             |   1 +
 .../functional-query_pairwise.csv               |   1 +
 .../queries/DataErrorsTest/orc-type-checks.test | 127 +++
 testdata/workloads/tpcds/tpcds_core.csv         |   1 +
 testdata/workloads/tpcds/tpcds_dimensions.csv   |   2 +-
 testdata/workloads/tpcds/tpcds_exhaustive.csv   |   3 +
 testdata/workloads/tpcds/tpcds_pairwise.csv     |   3 +
 testdata/workloads/tpch/tpch_core.csv           |   1 +
 testdata/workloads/tpch/tpch_dimensions.csv     |   2 +-
 testdata/workloads/tpch/tpch_exhaustive.csv     |   3 +
 testdata/workloads/tpch/tpch_pairwise.csv       |   3 +
 tests/common/impala_test_suite.py               |   2 +-
 tests/common/test_dimensions.py                 |   2 +-
 tests/common/test_vector.py                     |   2 +-
 tests/comparison/cli_options.py                 |   2 +-
 tests/query_test/test_chars.py                  |   6 +
 tests/query_test/test_decimal_queries.py        |   2 +-
 tests/query_test/test_scanners.py               | 102 ++-
 tests/query_test/test_scanners_fuzz.py          |   2 +
 tests/query_test/test_tpch_queries.py           |   2 +-
 62 files changed, 1745 insertions(+), 307 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 612e00c..43cd258 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -76,6 +76,7 @@ endfunction()
 # set_dep_root(PACKAGE) ->
 #   PACKAGE_ROOT set to 
$ENV{IMPALA_TOOLCHAIN}/PACKAGE-$ENV{IMPALA_PACKAGE_VERSION}
 set_dep_root(AVRO)
+set_dep_root(ORC)
 set_dep_root(BOOST)
 set_dep_root(BREAKPAD)
 set_dep_root(BZIP2)
@@ -272,6 +273,10 @@ message(STATUS "RapidJson include dir: " 
${RAPIDJSON_INCLUDE_DIR})
 find_package(Avro REQUIRED)
 IMPALA_ADD_THIRDPARTY_LIB(avro ${AVRO_INCLUDE_DIR} ${AVRO_STATIC_LIB} "")
 
+# find ORC headers and libs
+find_package(Orc REQUIRED)
+IMPALA_ADD_THIRDPARTY_LIB(orc ${ORC_INCLUDE_DIR} ${ORC_STATIC_LIB} "")
+
 # find protobuf headers, libs and compiler
 find_package(Protobuf REQUIRED)
 IMPALA_ADD_THIRDPARTY_LIB(protobuf ${PROTOBUF_INCLUDE_DIR} 
${PROTOBUF_STATIC_LIBRARY}

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index b6e10b0..8e4f8bd 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -295,6 +295,7 @@ set(CLANG_INCLUDE_FLAGS
   "-I${GFLAGS_INCLUDE_DIR}"
   "-I${RAPIDJSON_INCLUDE_DIR}"
   "-I${AVRO_INCLUDE_DIR}"
+  "-I${ORC_INCLUDE_DIR}"
   # Include Boost as a system directory to suppress warnings from headers.
   "-isystem${BOOST_INCLUDEDIR}"
   # Required so that jni.h can be found during Clang compilation
@@ -447,6 +448,7 @@ set (IMPALA_DEPENDENCIES
   zlib
   bzip2
   avro
+  orc
   java_jvm
   kudu_client)
 

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py 
b/be/src/codegen/gen_ir_descriptions.py
index 1d0f38e..26a8ad7 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -177,8 +177,8 @@ ir_functions = [
    
"_ZN6impala11HdfsScanner18WriteAlignedTuplesEPNS_7MemPoolEPNS_8TupleRowEPNS_13FieldLocationEiiiib"],
   ["PROCESS_SCRATCH_BATCH",
    "_ZN6impala18HdfsParquetScanner19ProcessScratchBatchEPNS_8RowBatchE"],
-  ["PARQUET_SCANNER_EVAL_RUNTIME_FILTER",
-   "_ZN6impala18HdfsParquetScanner17EvalRuntimeFilterEiPNS_8TupleRowE"],
+  ["HDFS_SCANNER_EVAL_RUNTIME_FILTER",
+   "_ZN6impala11HdfsScanner17EvalRuntimeFilterEiPNS_8TupleRowE"],
   ["STRING_TO_BOOL", "IrStringToBool"],
   ["STRING_TO_INT8", "IrStringToInt8"],
   ["STRING_TO_INT16", "IrStringToInt16"],

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index ddd84ee..7224df8 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -59,6 +59,7 @@ add_library(Exec
   hdfs-parquet-scanner.cc
   hdfs-parquet-scanner-ir.cc
   hdfs-parquet-table-writer.cc
+  hdfs-orc-scanner.cc
   hbase-scan-node.cc
   hbase-table-scanner.cc
   incr-stats-util.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-orc-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
new file mode 100644
index 0000000..3660600
--- /dev/null
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -0,0 +1,763 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs-orc-scanner.h"
+
+#include <queue>
+
+#include "exec/scanner-context.inline.h"
+#include "exprs/expr.h"
+#include "runtime/runtime-filter.inline.h"
+#include "runtime/tuple-row.h"
+#include "util/decompress.h"
+
+#include "common/names.h"
+
+using namespace impala;
+using namespace impala::io;
+
+DEFINE_bool(enable_orc_scanner, true,
+    "If false, reading from ORC format tables is not supported");
+
+Status HdfsOrcScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
+    const vector<HdfsFileDesc*>& files) {
+  for (HdfsFileDesc* file : files) {
+    // If the file size is less than 10 bytes, it is an invalid ORC file.
+    if (file->file_length < 10) {
+      return Status(Substitute("ORC file $0 has an invalid file length: $1",
+          file->filename, file->file_length));
+    }
+  }
+  return IssueFooterRanges(scan_node, THdfsFileFormat::ORC, files);
+}
+
+namespace impala {
+
+HdfsOrcScanner::OrcMemPool::OrcMemPool(HdfsOrcScanner* scanner)
+    : scanner_(scanner), mem_tracker_(scanner_->scan_node_->mem_tracker()) {
+}
+
+HdfsOrcScanner::OrcMemPool::~OrcMemPool() {
+  FreeAll();
+}
+
+void HdfsOrcScanner::OrcMemPool::FreeAll() {
+  int64_t total_bytes_released = 0;
+  for (auto it = chunk_sizes_.begin(); it != chunk_sizes_.end(); ++it) {
+    std::free(it->first);
+    total_bytes_released += it->second;
+  }
+  mem_tracker_->Release(total_bytes_released);
+  chunk_sizes_.clear();
+  ImpaladMetrics::MEM_POOL_TOTAL_BYTES->Increment(-total_bytes_released);
+}
+
+// orc-reader will not check the malloc result. We throw an exception if we 
can't
+// malloc to stop the orc-reader.
+char* HdfsOrcScanner::OrcMemPool::malloc(uint64_t size) {
+  if (!mem_tracker_->TryConsume(size)) {
+    throw ResourceError(mem_tracker_->MemLimitExceeded(
+        scanner_->state_, "Failed to allocate memory required by ORC library", 
size));
+  }
+  char* addr = static_cast<char*>(std::malloc(size));
+  if (addr == nullptr) {
+    mem_tracker_->Release(size);
+    throw ResourceError(Status(TErrorCode::MEM_ALLOC_FAILED, size));
+  }
+  chunk_sizes_[addr] = size;
+  ImpaladMetrics::MEM_POOL_TOTAL_BYTES->Increment(size);
+  return addr;
+}
+
+void HdfsOrcScanner::OrcMemPool::free(char* p) {
+  DCHECK(chunk_sizes_.find(p) != chunk_sizes_.end()) << "invalid free!" << endl
+       << GetStackTrace();
+  std::free(p);
+  int64_t size = chunk_sizes_[p];
+  mem_tracker_->Release(size);
+  ImpaladMetrics::MEM_POOL_TOTAL_BYTES->Increment(-size);
+  chunk_sizes_.erase(p);
+}
+
+// TODO: improve this to use async IO (IMPALA-6636).
+void HdfsOrcScanner::ScanRangeInputStream::read(void* buf, uint64_t length,
+    uint64_t offset) {
+  const ScanRange* metadata_range = scanner_->metadata_range_;
+  const ScanRange* split_range =
+      
reinterpret_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
+  int64_t partition_id = scanner_->context_->partition_descriptor()->id();
+
+  // Set expected_local to false to avoid cache on stale data (IMPALA-6830)
+  bool expected_local = false;
+  ScanRange* range = scanner_->scan_node_->AllocateScanRange(
+      metadata_range->fs(), scanner_->filename(), length, offset, partition_id,
+      split_range->disk_id(), expected_local,
+      BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length));
+
+  unique_ptr<BufferDescriptor> io_buffer;
+  Status status;
+  {
+    SCOPED_TIMER(scanner_->state_->total_storage_wait_timer());
+    status = scanner_->state_->io_mgr()->Read(
+        scanner_->scan_node_->reader_context(), range, &io_buffer);
+  }
+  if (io_buffer != nullptr) 
scanner_->state_->io_mgr()->ReturnBuffer(move(io_buffer));
+  if (!status.ok()) throw ResourceError(status);
+}
+
+HdfsOrcScanner::HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* 
state)
+  : HdfsScanner(scan_node, state),
+    assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
+  assemble_rows_timer_.Stop();
+}
+
+HdfsOrcScanner::~HdfsOrcScanner() {
+}
+
+Status HdfsOrcScanner::Open(ScannerContext* context) {
+  RETURN_IF_ERROR(HdfsScanner::Open(context));
+  metadata_range_ = stream_->scan_range();
+  num_cols_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcColumns", TUnit::UNIT);
+  num_stripes_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumOrcStripes", TUnit::UNIT);
+  num_scanners_with_no_reads_counter_ =
+      ADD_COUNTER(scan_node_->runtime_profile(), "NumScannersWithNoReads", 
TUnit::UNIT);
+  process_footer_timer_stats_ =
+      ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), 
"OrcFooterProcessingTime");
+  scan_node_->IncNumScannersCodegenDisabled();
+
+  DCHECK(parse_status_.ok()) << "Invalid parse_status_" << 
parse_status_.GetDetail();
+  for (const FilterContext& ctx : context->filter_ctxs()) {
+    DCHECK(ctx.filter != nullptr);
+    filter_ctxs_.push_back(&ctx);
+  }
+  filter_stats_.resize(filter_ctxs_.size());
+  reader_mem_pool_.reset(new OrcMemPool(this));
+  reader_options_.setMemoryPool(*reader_mem_pool_);
+
+  // Each scan node can process multiple splits. Each split processes the 
footer once.
+  // We use a timer to measure the time taken to ProcessFileTail() per split 
and add
+  // this time to the averaged timer.
+  MonotonicStopWatch single_footer_process_timer;
+  single_footer_process_timer.Start();
+  // First process the file metadata in the footer.
+  Status footer_status = ProcessFileTail();
+  single_footer_process_timer.Stop();
+  
process_footer_timer_stats_->UpdateCounter(single_footer_process_timer.ElapsedTime());
+
+  // Release I/O buffers immediately to make sure they are cleaned up
+  // in case we return a non-OK status anywhere below.
+  context_->ReleaseCompletedResources(true);
+  RETURN_IF_ERROR(footer_status);
+
+  // Update orc reader options base on the tuple descriptor
+  RETURN_IF_ERROR(SelectColumns(scan_node_->tuple_desc()));
+
+  // Set top-level template tuple.
+  template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
+  return Status::OK();
+}
+
+void HdfsOrcScanner::Close(RowBatch* row_batch) {
+  DCHECK(!is_closed_);
+  if (row_batch != nullptr) {
+    context_->ReleaseCompletedResources(true);
+    row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), 
false);
+    if (scan_node_->HasRowBatchQueue()) {
+      static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
+          unique_ptr<RowBatch>(row_batch));
+    }
+  } else {
+    template_tuple_pool_->FreeAll();
+    context_->ReleaseCompletedResources(true);
+  }
+  scratch_batch_.reset(nullptr);
+
+  // Verify all resources (if any) have been transferred.
+  DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
+
+  assemble_rows_timer_.Stop();
+  assemble_rows_timer_.ReleaseCounter();
+
+  THdfsCompression::type compression_type = THdfsCompression::NONE;
+  if (reader_ != nullptr) {
+    compression_type = TranslateCompressionKind(reader_->getCompression());
+  }
+  scan_node_->RangeComplete(THdfsFileFormat::ORC, compression_type);
+
+  for (int i = 0; i < filter_ctxs_.size(); ++i) {
+    const FilterStats* stats = filter_ctxs_[i]->stats;
+    const LocalFilterStats& local = filter_stats_[i];
+    stats->IncrCounters(FilterStats::ROWS_KEY, local.total_possible,
+        local.considered, local.rejected);
+  }
+  CloseInternal();
+}
+
+Status HdfsOrcScanner::ProcessFileTail() {
+  unique_ptr<orc::InputStream> input_stream(new ScanRangeInputStream(this));
+  VLOG_FILE << "Processing FileTail of ORC file: " << input_stream->getName()
+      << ", length: " << input_stream->getLength();
+  try {
+    reader_ = orc::createReader(move(input_stream), reader_options_);
+  } catch (ResourceError& e) {  // errors throw from the orc scanner
+    parse_status_ = e.GetStatus();
+    return parse_status_;
+  } catch (std::exception& e) { // other errors throw from the orc library
+    string msg = Substitute("Encountered parse error in tail of ORC file $0: 
$1",
+        filename(), e.what());
+    parse_status_ = Status(msg);
+    return parse_status_;
+  }
+
+  if (reader_->getNumberOfRows() == 0)  return Status::OK();
+  if (reader_->getNumberOfStripes() == 0) {
+    return Status(Substitute("Invalid ORC file: $0. No stripes in this file 
but"
+        " numberOfRows in footer is $1", filename(), 
reader_->getNumberOfRows()));
+  }
+  return Status::OK();
+}
+
+inline THdfsCompression::type HdfsOrcScanner::TranslateCompressionKind(
+    orc::CompressionKind kind) {
+  switch (kind) {
+    case orc::CompressionKind::CompressionKind_NONE: return 
THdfsCompression::NONE;
+    // zlib used in ORC is corresponding to Deflate in Impala
+    case orc::CompressionKind::CompressionKind_ZLIB: return 
THdfsCompression::DEFLATE;
+    case orc::CompressionKind::CompressionKind_SNAPPY: return 
THdfsCompression::SNAPPY;
+    case orc::CompressionKind::CompressionKind_LZO: return 
THdfsCompression::LZO;
+    case orc::CompressionKind::CompressionKind_LZ4: return 
THdfsCompression::LZ4;
+    case orc::CompressionKind::CompressionKind_ZSTD: return 
THdfsCompression::ZSTD;
+    default:
+      VLOG_QUERY << "Unknown compression kind of orc::CompressionKind: " << 
kind;
+  }
+  return THdfsCompression::DEFAULT;
+}
+
+Status HdfsOrcScanner::SelectColumns(const TupleDescriptor* tuple_desc) {
+  list<uint64_t> selected_indices;
+  int num_columns = 0;
+  const orc::Type& root_type = reader_->getType();
+  // TODO validate columns. e.g. scale of decimal type
+  for (SlotDescriptor* slot_desc: tuple_desc->slots()) {
+    // Skip partition columns
+    if (slot_desc->col_pos() < scan_node_->num_partition_keys()) continue;
+
+    const SchemaPath &path = slot_desc->col_path();
+    DCHECK_EQ(path.size(), 1);
+    int col_idx = path[0];
+    // The first index in a path includes the table's partition keys
+    int col_idx_in_file = col_idx - scan_node_->num_partition_keys();
+    if (col_idx_in_file >= root_type.getSubtypeCount()) {
+      // In this case, we are selecting a column that is not in the file.
+      // Update the template tuple to put a NULL in this slot.
+      Tuple** template_tuple = &template_tuple_map_[tuple_desc];
+      if (*template_tuple == nullptr) {
+        *template_tuple =
+            Tuple::Create(tuple_desc->byte_size(), template_tuple_pool_.get());
+      }
+      (*template_tuple)->SetNull(slot_desc->null_indicator_offset());
+      continue;
+    }
+    selected_indices.push_back(col_idx_in_file);
+    const orc::Type* orc_type = root_type.getSubtype(col_idx_in_file);
+    const ColumnType& col_type = 
scan_node_->hdfs_table()->col_descs()[col_idx].type();
+    // TODO(IMPALA-6503): Support reading complex types from ORC format files
+    DCHECK(!col_type.IsComplexType()) << "Complex types are not supported yet";
+    RETURN_IF_ERROR(ValidateType(col_type, *orc_type));
+    col_id_slot_map_[orc_type->getColumnId()] = slot_desc;
+    ++num_columns;
+  }
+  COUNTER_SET(num_cols_counter_, static_cast<int64_t>(num_columns));
+  row_reader_options.include(selected_indices);
+  return Status::OK();
+}
+
+Status HdfsOrcScanner::ValidateType(const ColumnType& type, const orc::Type& 
orc_type) {
+  switch (orc_type.getKind()) {
+    case orc::TypeKind::BOOLEAN:
+      if (type.type == TYPE_BOOLEAN) return Status::OK();
+      break;
+    case orc::TypeKind::BYTE:
+      if (type.type == TYPE_TINYINT || type.type == TYPE_SMALLINT
+          || type.type == TYPE_INT || type.type == TYPE_BIGINT)
+        return Status::OK();
+      break;
+    case orc::TypeKind::SHORT:
+      if (type.type == TYPE_SMALLINT || type.type == TYPE_INT
+          || type.type == TYPE_BIGINT)
+        return Status::OK();
+      break;
+    case orc::TypeKind::INT:
+      if (type.type == TYPE_INT || type.type == TYPE_BIGINT) return 
Status::OK();
+      break;
+    case orc::TypeKind::LONG:
+      if (type.type == TYPE_BIGINT) return Status::OK();
+      break;
+    case orc::TypeKind::FLOAT:
+    case orc::TypeKind::DOUBLE:
+      if (type.type == TYPE_FLOAT || type.type == TYPE_DOUBLE) return 
Status::OK();
+      break;
+    case orc::TypeKind::STRING:
+    case orc::TypeKind::VARCHAR:
+    case orc::TypeKind::CHAR:
+      if (type.type == TYPE_STRING || type.type == TYPE_VARCHAR
+          || type.type == TYPE_CHAR)
+        return Status::OK();
+      break;
+    case orc::TypeKind::TIMESTAMP:
+      if (type.type == TYPE_TIMESTAMP) return Status::OK();
+      break;
+    case orc::TypeKind::DECIMAL: {
+      if (type.type != TYPE_DECIMAL || type.scale != orc_type.getScale()) 
break;
+      bool overflow = false;
+      int orc_precision = orc_type.getPrecision();
+      if (orc_precision == 0 || orc_precision > 
ColumnType::MAX_DECIMAL8_PRECISION) {
+        // For ORC decimals whose precision is larger than 18, its value can't 
fit into
+        // an int64 (10^19 > 2^63). So we should use int128 (16 bytes) for 
this case.
+        // The possible byte sizes for Impala decimals are 4, 8, 16.
+        // We mark it as overflow if the target byte size is not 16.
+        overflow = (type.GetByteSize() != 16);
+      } else if (orc_type.getPrecision() > ColumnType::MAX_DECIMAL4_PRECISION) 
{
+        // For ORC decimals whose precision <= 18 and > 9, int64 and int128 
can fit them.
+        // We only mark it as overflow if the target byte size is 4.
+        overflow = (type.GetByteSize() == 4);
+      }
+      if (!overflow) return Status::OK();
+      return Status(Substitute(
+          "It can't be truncated to table column $2 for column $0 in ORC file 
'$1'",
+          orc_type.toString(), filename(), type.DebugString()));
+    }
+    default: break;
+  }
+  return Status(Substitute(
+      "Type mismatch: table column $0 is map to column $1 in ORC file '$2'",
+      type.DebugString(), orc_type.toString(), filename()));
+}
+
+Status HdfsOrcScanner::ProcessSplit() {
+  DCHECK(scan_node_->HasRowBatchQueue());
+  HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
+  do {
+    unique_ptr<RowBatch> batch = make_unique<RowBatch>(scan_node_->row_desc(),
+        state_->batch_size(), scan_node_->mem_tracker());
+    Status status = GetNextInternal(batch.get());
+    // Always add batch to the queue because it may contain data referenced by 
previously
+    // appended batches.
+    scan_node->AddMaterializedRowBatch(move(batch));
+    RETURN_IF_ERROR(status);
+    ++row_batches_produced_;
+    if ((row_batches_produced_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1)) 
== 0) {
+      CheckFiltersEffectiveness();
+    }
+  } while (!eos_ && !scan_node_->ReachedLimit());
+  return Status::OK();
+}
+
+Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
+  if (scan_node_->IsZeroSlotTableScan()) {
+    uint64_t file_rows = reader_->getNumberOfRows();
+    // There are no materialized slots, e.g. count(*) over the table.  We can 
serve
+    // this query from just the file metadata.  We don't need to read the 
column data.
+    if (stripe_rows_read_ == file_rows) {
+      eos_ = true;
+      return Status::OK();
+    }
+    assemble_rows_timer_.Start();
+    DCHECK_LT(stripe_rows_read_, file_rows);
+    int64_t rows_remaining = file_rows - stripe_rows_read_;
+    int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
+    TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
+    int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
+    Status status = CommitRows(num_to_commit, row_batch);
+    assemble_rows_timer_.Stop();
+    RETURN_IF_ERROR(status);
+    stripe_rows_read_ += max_tuples;
+    COUNTER_ADD(scan_node_->rows_read_counter(), num_to_commit);
+    return Status::OK();
+  }
+
+  // reset tuple memory. We'll allocate it the first time we use it.
+  tuple_mem_ = nullptr;
+  tuple_ = nullptr;
+
+  // Transfer remaining tuples from the scratch batch.
+  if (ScratchBatchNotEmpty()) {
+    assemble_rows_timer_.Start();
+    RETURN_IF_ERROR(TransferScratchTuples(row_batch));
+    assemble_rows_timer_.Stop();
+    if (row_batch->AtCapacity()) return Status::OK();
+    DCHECK_EQ(scratch_batch_tuple_idx_, scratch_batch_->numElements);
+  }
+
+  while (advance_stripe_ || end_of_stripe_) {
+    context_->ReleaseCompletedResources(/* done */ true);
+    // Commit the rows to flush the row batch from the previous stripe
+    RETURN_IF_ERROR(CommitRows(0, row_batch));
+
+    RETURN_IF_ERROR(NextStripe());
+    DCHECK_LE(stripe_idx_, reader_->getNumberOfStripes());
+    if (stripe_idx_ == reader_->getNumberOfStripes()) {
+      eos_ = true;
+      DCHECK(parse_status_.ok());
+      return Status::OK();
+    }
+  }
+
+  // Apply any runtime filters to static tuples containing the partition keys 
for this
+  // partition. If any filter fails, we return immediately and stop processing 
this
+  // scan range.
+  if 
(!scan_node_->PartitionPassesFilters(context_->partition_descriptor()->id(),
+      FilterStats::ROW_GROUPS_KEY, context_->filter_ctxs())) {
+    eos_ = true;
+    DCHECK(parse_status_.ok());
+    return Status::OK();
+  }
+  assemble_rows_timer_.Start();
+  Status status = AssembleRows(row_batch);
+  assemble_rows_timer_.Stop();
+  RETURN_IF_ERROR(status);
+  if (!parse_status_.ok()) {
+    RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
+    parse_status_ = Status::OK();
+  }
+  return Status::OK();
+}
+
+inline bool HdfsOrcScanner::ScratchBatchNotEmpty() {
+  return scratch_batch_ != nullptr
+      && scratch_batch_tuple_idx_ < scratch_batch_->numElements;
+}
+
+inline static bool CheckStripeOverlapsSplit(int64_t stripe_start, int64_t 
stripe_end,
+    int64_t split_start, int64_t split_end) {
+  return (split_start >= stripe_start && split_start < stripe_end) ||
+      (split_end > stripe_start && split_end <= stripe_end) ||
+      (split_start <= stripe_start && split_end >= stripe_end);
+}
+
+Status HdfsOrcScanner::NextStripe() {
+  const ScanRange* split_range = static_cast<ScanRangeMetadata*>(
+      metadata_range_->meta_data())->original_split;
+  int64_t split_offset = split_range->offset();
+  int64_t split_length = split_range->len();
+
+  bool start_with_first_stripe = stripe_idx_ == -1;
+  bool misaligned_stripe_skipped = false;
+
+  advance_stripe_ = false;
+  stripe_rows_read_ = 0;
+
+  // Loop until we have found a non-empty stripe.
+  while (true) {
+    // Reset the parse status for the next stripe.
+    parse_status_ = Status::OK();
+
+    ++stripe_idx_;
+    if (stripe_idx_ >= reader_->getNumberOfStripes()) {
+      if (start_with_first_stripe && misaligned_stripe_skipped) {
+        // We started with the first stripe and skipped all the stripes 
because they were
+        // misaligned. The execution flow won't reach this point if there is 
at least one
+        // non-empty stripe which this scanner can process.
+        COUNTER_ADD(num_scanners_with_no_reads_counter_, 1);
+      }
+      break;
+    }
+    unique_ptr<orc::StripeInformation> stripe = 
reader_->getStripe(stripe_idx_);
+    // Also check 'footer_.numberOfRows' to make sure 'select count(*)' and 
'select *'
+    // behave consistently for corrupt files that have 'footer_.numberOfRows 
== 0'
+    // but some data in stripe.
+    if (stripe->getNumberOfRows() == 0 || reader_->getNumberOfRows() == 0) 
continue;
+
+    uint64_t stripe_offset = stripe->getOffset();
+    uint64_t stripe_len = stripe->getIndexLength() + stripe->getDataLength() +
+        stripe->getFooterLength();
+    int64_t stripe_mid_pos = stripe_offset + stripe_len / 2;
+    if (!(stripe_mid_pos >= split_offset &&
+        stripe_mid_pos < split_offset + split_length)) {
+      // Middle pos not in split, this stripe will be handled by a different 
scanner.
+      // Mark if the stripe overlaps with the split.
+      misaligned_stripe_skipped |= CheckStripeOverlapsSplit(stripe_offset,
+          stripe_offset + stripe_len, split_offset, split_offset + 
split_length);
+      continue;
+    }
+
+    // TODO: check if this stripe can be skipped by stats. e.g. IMPALA-6505
+
+    COUNTER_ADD(num_stripes_counter_, 1);
+    row_reader_options.range(stripe->getOffset(), stripe_len);
+    try {
+      row_reader_ = reader_->createRowReader(row_reader_options);
+    } catch (ResourceError& e) {  // errors throw from the orc scanner
+      parse_status_ = e.GetStatus();
+      return parse_status_;
+    } catch (std::exception& e) { // errors throw from the orc library
+      VLOG_QUERY << "Error in creating ORC column readers: " << e.what();
+      parse_status_ = Status(
+          Substitute("Error in creating ORC column readers: $0.", e.what()));
+      return parse_status_;
+    }
+    end_of_stripe_ = false;
+    VLOG_ROW << Substitute("Created RowReader for stripe(offset=$0, len=$1) in 
file $2",
+        stripe->getOffset(), stripe_len, filename());
+    break;
+  }
+
+  DCHECK(parse_status_.ok());
+  return Status::OK();
+}
+
+Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
+  bool continue_execution = !scan_node_->ReachedLimit() && 
!context_->cancelled();
+  if (!continue_execution)  return Status::CANCELLED;
+
+  scratch_batch_tuple_idx_ = 0;
+  scratch_batch_ = row_reader_->createRowBatch(row_batch->capacity());
+  DCHECK_EQ(scratch_batch_->numElements, 0);
+
+  int64_t num_rows_read = 0;
+  while (continue_execution) {  // one ORC scratch batch (ColumnVectorBatch) 
in a round
+    if (scratch_batch_tuple_idx_ == scratch_batch_->numElements) {
+      try {
+        if (!row_reader_->next(*scratch_batch_)) {
+          end_of_stripe_ = true;
+          break; // no more data to process
+        }
+      } catch (ResourceError& e) {
+        parse_status_ = e.GetStatus();
+        return parse_status_;
+      } catch (std::exception& e) {
+        VLOG_QUERY << "Encounter parse error: " << e.what();
+        parse_status_ = Status(Substitute("Encounter parse error: $0.", 
e.what()));
+        eos_ = true;
+        return parse_status_;
+      }
+      if (scratch_batch_->numElements == 0) {
+        RETURN_IF_ERROR(CommitRows(0, row_batch));
+        end_of_stripe_ = true;
+        return Status::OK();
+      }
+      num_rows_read += scratch_batch_->numElements;
+      scratch_batch_tuple_idx_ = 0;
+    }
+
+    RETURN_IF_ERROR(TransferScratchTuples(row_batch));
+    if (row_batch->AtCapacity()) break;
+    continue_execution &= !scan_node_->ReachedLimit() && 
!context_->cancelled();
+  }
+  stripe_rows_read_ += num_rows_read;
+  COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
+  return Status::OK();
+}
+
+Status HdfsOrcScanner::TransferScratchTuples(RowBatch* dst_batch) {
+  const TupleDescriptor* tuple_desc = scan_node_->tuple_desc();
+
+  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_->data();
+  int num_conjuncts = conjunct_evals_->size();
+
+  const orc::Type* root_type = &row_reader_->getSelectedType();
+  DCHECK_EQ(root_type->getKind(), orc::TypeKind::STRUCT);
+
+  DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
+  if (tuple_ == nullptr) RETURN_IF_ERROR(AllocateTupleMem(dst_batch));
+  int row_id = dst_batch->num_rows();
+  int capacity = dst_batch->capacity();
+  int num_to_commit = 0;
+  TupleRow* row = dst_batch->GetRow(row_id);
+  Tuple* tuple = tuple_;  // tuple_ is updated in CommitRows
+
+  // TODO(IMPALA-6506): codegen the runtime filter + conjunct evaluation loop
+  // TODO: transfer the scratch_batch_ column-by-column for batch, and then 
evaluate
+  // the predicates in later loop.
+  while (row_id < capacity && ScratchBatchNotEmpty()) {
+    DCHECK_LT((void*)tuple, (void*)tuple_mem_end_);
+    InitTuple(tuple_desc, template_tuple_, tuple);
+    RETURN_IF_ERROR(ReadRow(static_cast<const 
orc::StructVectorBatch&>(*scratch_batch_),
+        scratch_batch_tuple_idx_++, root_type, tuple, dst_batch));
+    row->SetTuple(scan_node_->tuple_idx(), tuple);
+    if (!EvalRuntimeFilters(row)) continue;
+    if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, row)) {
+      row = next_row(row);
+      tuple = next_tuple(tuple_desc->byte_size(), tuple);
+      ++row_id;
+      ++num_to_commit;
+    }
+  }
+  VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 
rows)",
+      num_to_commit, dst_batch->num_rows());
+  return CommitRows(num_to_commit, dst_batch);
+}
+
+Status HdfsOrcScanner::AllocateTupleMem(RowBatch* row_batch) {
+  int64_t tuple_buffer_size;
+  RETURN_IF_ERROR(
+      row_batch->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, 
&tuple_mem_));
+  tuple_mem_end_ = tuple_mem_ + tuple_buffer_size;
+  tuple_ = reinterpret_cast<Tuple*>(tuple_mem_);
+  DCHECK_GT(row_batch->capacity(), 0);
+  return Status::OK();
+}
+
+inline Status HdfsOrcScanner::ReadRow(const orc::StructVectorBatch& batch, int 
row_idx,
+    const orc::Type* orc_type, Tuple* tuple, RowBatch* dst_batch) {
+  for (unsigned int c = 0; c < orc_type->getSubtypeCount(); ++c) {
+    orc::ColumnVectorBatch* col_batch = batch.fields[c];
+    const orc::Type* col_type = orc_type->getSubtype(c);
+    const SlotDescriptor* slot_desc = DCHECK_NOTNULL(
+        col_id_slot_map_[col_type->getColumnId()]);
+    if (col_batch->hasNulls && !col_batch->notNull[row_idx]) {
+      tuple->SetNull(slot_desc->null_indicator_offset());
+      continue;
+    }
+    void* slot_val_ptr = tuple->GetSlot(slot_desc->tuple_offset());
+    switch (col_type->getKind()) {
+      case orc::TypeKind::BOOLEAN: {
+        int64_t val = static_cast<const orc::LongVectorBatch*>(col_batch)->
+            data.data()[row_idx];
+        *(reinterpret_cast<bool*>(slot_val_ptr)) = (val != 0);
+        break;
+      }
+      case orc::TypeKind::BYTE:
+      case orc::TypeKind::SHORT:
+      case orc::TypeKind::INT:
+      case orc::TypeKind::LONG: {
+        const orc::LongVectorBatch* long_batch =
+            static_cast<const orc::LongVectorBatch*>(col_batch);
+        int64_t val = long_batch->data.data()[row_idx];
+        switch (slot_desc->type().type) {
+          case TYPE_TINYINT:
+            *(reinterpret_cast<int8_t*>(slot_val_ptr)) = val;
+            break;
+          case TYPE_SMALLINT:
+            *(reinterpret_cast<int16_t*>(slot_val_ptr)) = val;
+            break;
+          case TYPE_INT:
+            *(reinterpret_cast<int32_t*>(slot_val_ptr)) = val;
+            break;
+          case TYPE_BIGINT:
+            *(reinterpret_cast<int64_t*>(slot_val_ptr)) = val;
+            break;
+          default:
+            DCHECK(false) << "Illegal translation from impala type "
+                << slot_desc->DebugString() << " to orc INT";
+        }
+        break;
+      }
+      case orc::TypeKind::FLOAT:
+      case orc::TypeKind::DOUBLE: {
+        double val =
+            static_cast<const 
orc::DoubleVectorBatch*>(col_batch)->data.data()[row_idx];
+        if (slot_desc->type().type == TYPE_FLOAT) {
+          *(reinterpret_cast<float*>(slot_val_ptr)) = val;
+        } else {
+          DCHECK_EQ(slot_desc->type().type, TYPE_DOUBLE);
+          *(reinterpret_cast<double*>(slot_val_ptr)) = val;
+        }
+        break;
+      }
+      case orc::TypeKind::STRING:
+      case orc::TypeKind::VARCHAR:
+      case orc::TypeKind::CHAR: {
+        auto str_batch = static_cast<const orc::StringVectorBatch*>(col_batch);
+        const char* src_ptr = str_batch->data.data()[row_idx];
+        int64_t src_len = str_batch->length.data()[row_idx];
+        int dst_len = slot_desc->type().len;
+        if (slot_desc->type().type == TYPE_CHAR) {
+          int unpadded_len = min(dst_len, static_cast<int>(src_len));
+          char* dst_char = reinterpret_cast<char*>(slot_val_ptr);
+          memcpy(dst_char, src_ptr, unpadded_len);
+          StringValue::PadWithSpaces(dst_char, dst_len, unpadded_len);
+          break;
+        }
+        StringValue* dst = reinterpret_cast<StringValue*>(slot_val_ptr);
+        if (slot_desc->type().type == TYPE_VARCHAR && src_len > dst_len) {
+          dst->len = dst_len;
+        } else {
+          dst->len = src_len;
+        }
+        // Space in the StringVectorBatch is allocated by reader_mem_pool_. It 
will be
+        // reused at next batch, so we allocate a new space for this string.
+        uint8_t* buffer = dst_batch->tuple_data_pool()->TryAllocate(dst->len);
+        if (buffer == nullptr) {
+          string details = Substitute("Could not allocate string buffer of $0 
bytes "
+              "for ORC file '$1'.", dst->len, filename());
+          return scan_node_->mem_tracker()->MemLimitExceeded(
+              state_, details, dst->len);
+        }
+        dst->ptr = reinterpret_cast<char*>(buffer);
+        memcpy(dst->ptr, src_ptr, dst->len);
+        break;
+      }
+      case orc::TypeKind::TIMESTAMP: {
+        const orc::TimestampVectorBatch* ts_batch =
+            static_cast<const orc::TimestampVectorBatch*>(col_batch);
+        int64_t secs = ts_batch->data.data()[row_idx];
+        int64_t nanos = ts_batch->nanoseconds.data()[row_idx];
+        *reinterpret_cast<TimestampValue*>(slot_val_ptr) =
+            TimestampValue::FromUnixTimeNanos(secs, nanos);
+        break;
+      }
+      case orc::TypeKind::DECIMAL: {
+        // For decimals whose precision is larger than 18, its value can't fit 
into
+        // an int64 (10^19 > 2^63). So we should use int128 for this case.
+        if (col_type->getPrecision() == 0 || col_type->getPrecision() > 18) {
+          auto int128_batch = static_cast<const 
orc::Decimal128VectorBatch*>(col_batch);
+          orc::Int128 orc_val = int128_batch->values.data()[row_idx];
+
+          DCHECK_EQ(slot_desc->type().GetByteSize(), 16);
+          int128_t val = orc_val.getHighBits();
+          val <<= 64;
+          val |= orc_val.getLowBits();
+          // Use memcpy to avoid gcc generating unaligned instructions like 
movaps
+          // for int128_t. They will raise SegmentFault when addresses are not
+          // aligned to 16 bytes.
+          memcpy(slot_val_ptr, &val, sizeof(int128_t));
+        } else {
+          // Reminder: even decimal(1,1) is stored in int64 batch
+          auto int64_batch = static_cast<const 
orc::Decimal64VectorBatch*>(col_batch);
+          int64_t val = int64_batch->values.data()[row_idx];
+
+          switch (slot_desc->type().GetByteSize()) {
+            case 4:
+              reinterpret_cast<Decimal4Value*>(slot_val_ptr)->value() = val;
+              break;
+            case 8:
+              reinterpret_cast<Decimal8Value*>(slot_val_ptr)->value() = val;
+              break;
+            case 16:
+              reinterpret_cast<Decimal16Value*>(slot_val_ptr)->value() = val;
+              break;
+            default: DCHECK(false) << "invalidate byte size";
+          }
+        }
+        break;
+      }
+      case orc::TypeKind::LIST:
+      case orc::TypeKind::MAP:
+      case orc::TypeKind::STRUCT:
+      case orc::TypeKind::UNION:
+      default:
+        DCHECK(false) << slot_desc->type().DebugString() << " map to ORC 
column "
+            << col_type->toString();
+    }
+  }
+  return Status::OK();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-orc-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
new file mode 100644
index 0000000..837d92a
--- /dev/null
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_EXEC_HDFS_ORC_SCANNER_H
+#define IMPALA_EXEC_HDFS_ORC_SCANNER_H
+
+#include <orc/OrcFile.hh>
+
+#include "runtime/runtime-state.h"
+#include "exec/hdfs-scanner.h"
+#include "exec/hdfs-scan-node.h"
+#include "util/runtime-profile-counters.h"
+
+namespace impala {
+
+struct HdfsFileDesc;
+
+/// This scanner leverage the ORC library to parse ORC files located in HDFS. 
Data is
+/// transformed into Impala in-memory representation, i.e. Tuples, RowBatches.
+///
+/// For the file format spec, see https://orc.apache.org/docs/spec-intro.html
+class HdfsOrcScanner : public HdfsScanner {
+ public:
+  /// Exception throws from the orc scanner to stop the orc::RowReader. It's 
used in
+  /// IO errors (e.g. cancellation) or memory errors (e.g. mem_limit 
exceeded). The
+  /// exact error message will be recorded in parse_status_.
+  class ResourceError : public std::runtime_error {
+   public:
+    explicit ResourceError(const Status& status)
+      : runtime_error(status.msg().msg()), status_(status) {}
+    virtual ~ResourceError() {}
+    Status& GetStatus() { return status_; }
+
+   private:
+    Status status_;
+  };
+
+  class OrcMemPool : public orc::MemoryPool {
+   public:
+    OrcMemPool(HdfsOrcScanner* scanner);
+    virtual ~OrcMemPool();
+
+    char* malloc(uint64_t size) override;
+    void free(char* p) override;
+
+    void FreeAll();
+   private:
+
+    HdfsOrcScanner* scanner_;
+    MemTracker* mem_tracker_;
+    boost::unordered_map<char*, uint64_t> chunk_sizes_;
+  };
+
+  class ScanRangeInputStream : public orc::InputStream {
+   public:
+    ScanRangeInputStream(HdfsOrcScanner* scanner) {
+      this->scanner_ = scanner;
+      this->filename_ = scanner->filename();
+      this->file_desc_ = scanner->scan_node_->GetFileDesc(
+          scanner->context_->partition_descriptor()->id(), filename_);
+    }
+
+    uint64_t getLength() const {
+      return file_desc_->file_length;
+    }
+
+    uint64_t getNaturalReadSize() const {
+      return scanner_->state_->io_mgr()->max_read_buffer_size();
+    }
+
+    void read(void* buf, uint64_t length, uint64_t offset);
+
+    const std::string& getName() const {
+      return filename_;
+    }
+
+  private:
+    HdfsOrcScanner* scanner_;
+    HdfsFileDesc* file_desc_;
+    std::string filename_;
+  };
+
+  HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state);
+  virtual ~HdfsOrcScanner();
+
+  /// Issue just the footer range for each file.  We'll then parse the footer 
and pick
+  /// out the columns we want.
+  static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
+      const std::vector<HdfsFileDesc*>& files) WARN_UNUSED_RESULT;
+
+  virtual Status Open(ScannerContext* context) override WARN_UNUSED_RESULT;
+  virtual Status ProcessSplit() override WARN_UNUSED_RESULT;
+  virtual void Close(RowBatch* row_batch) override;
+
+ private:
+  friend class HdfsOrcScannerTest;
+
+  /// Memory guard of the tuple_mem_
+  uint8_t* tuple_mem_end_ = nullptr;
+
+  /// Index of the current stripe being processed. Initialized to -1 which 
indicates
+  /// that we have not started processing the first stripe yet (GetNext() has 
not yet
+  /// been called).
+  int32_t stripe_idx_ = -1;
+
+  /// Counts the number of rows processed for the current stripe.
+  int64_t stripe_rows_read_ = 0;
+
+  /// Indicates whether we should advance to the next stripe in the next 
GetNext().
+  /// Starts out as true to move to the very first stripe.
+  bool advance_stripe_ = true;
+
+  /// Indicates whether we are at the end of a stripe.
+  bool end_of_stripe_ = true;
+
+  /// Number of scratch batches processed so far.
+  int64_t row_batches_produced_ = 0;
+
+  /// Mem pool used in orc readers.
+  boost::scoped_ptr<OrcMemPool> reader_mem_pool_;
+
+  /// orc::Reader's responsibility is to read the footer and metadata from an 
ORC file.
+  /// It creates orc::RowReader for further materialization. orc::RowReader is 
used for
+  /// reading rows from the file.
+  std::unique_ptr<orc::Reader> reader_ = nullptr;
+  std::unique_ptr<orc::RowReader> row_reader_ = nullptr;
+
+  /// Orc reader will write slot values into this scratch batch for top-level 
tuples.
+  /// See AssembleRows().
+  std::unique_ptr<orc::ColumnVectorBatch> scratch_batch_;
+  int scratch_batch_tuple_idx_ = 0;
+
+  /// ReaderOptions used to create orc::Reader.
+  orc::ReaderOptions reader_options_;
+
+  /// RowReaderOptions used to create orc::RowReader.
+  orc::RowReaderOptions row_reader_options;
+
+  /// Column id is the pre order id in orc::Type tree.
+  /// Map from column id to slot descriptor.
+  boost::unordered_map<int, const SlotDescriptor*> col_id_slot_map_;
+
+  /// Scan range for the metadata.
+  const io::ScanRange* metadata_range_ = nullptr;
+
+  /// Timer for materializing rows. This ignores time getting the next buffer.
+  ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
+
+  /// Average and min/max time spent processing the footer by each split.
+  RuntimeProfile::SummaryStatsCounter* process_footer_timer_stats_ = nullptr;
+
+  /// Number of columns that need to be read.
+  RuntimeProfile::Counter* num_cols_counter_ = nullptr;
+
+  /// Number of stripes that need to be read.
+  RuntimeProfile::Counter* num_stripes_counter_ = nullptr;
+
+  /// Number of scanners that end up doing no reads because their splits don't 
overlap
+  /// with the midpoint of any stripe in the file.
+  RuntimeProfile::Counter* num_scanners_with_no_reads_counter_ = nullptr;
+
+  const char *filename() const { return metadata_range_->file(); }
+
+  virtual Status GetNextInternal(RowBatch* row_batch) override 
WARN_UNUSED_RESULT;
+
+  /// Advances 'stripe_idx_' to the next non-empty stripe and initializes
+  /// row_reader_ to scan it.
+  Status NextStripe() WARN_UNUSED_RESULT;
+
+  /// Reads data using orc-reader to materialize instances of 'tuple_desc'.
+  /// Returns a non-OK status if a non-recoverable error was encountered and 
execution
+  /// of this query should be terminated immediately.
+  Status AssembleRows(RowBatch* row_batch) WARN_UNUSED_RESULT;
+
+  /// Function used by TransferScratchTuples() to read a single row from 
scratch_batch_
+  /// into 'tuple'.
+  Status ReadRow(const orc::StructVectorBatch& batch, int row_idx,
+      const orc::Type* orc_type, Tuple* tuple, RowBatch* dst_batch) 
WARN_UNUSED_RESULT;
+
+  /// Evaluates runtime filters and conjuncts (if any) against the tuples in
+  /// 'scratch_batch_', and adds the surviving tuples to the given batch.
+  /// Returns the number of rows that should be committed to the given batch.
+  Status TransferScratchTuples(RowBatch* dst_batch) WARN_UNUSED_RESULT;
+
+  /// Process the file footer and parse file_metadata_.  This should be called 
with the
+  /// last FOOTER_SIZE bytes in context_.
+  Status ProcessFileTail() WARN_UNUSED_RESULT;
+
+  /// Update reader options used in orc reader by the given tuple descriptor.
+  Status SelectColumns(const TupleDescriptor* tuple_desc) WARN_UNUSED_RESULT;
+
+  /// Validate whether the ColumnType is compatible with the orc type
+  Status ValidateType(const ColumnType& type, const orc::Type& orc_type)
+      WARN_UNUSED_RESULT;
+
+  /// Part of the HdfsScanner interface, not used in Orc.
+  Status InitNewRange() override WARN_UNUSED_RESULT { return Status::OK(); }
+
+  THdfsCompression::type TranslateCompressionKind(orc::CompressionKind kind);
+
+  inline bool ScratchBatchNotEmpty();
+
+  inline Status AllocateTupleMem(RowBatch* row_batch) WARN_UNUSED_RESULT;
+
+};
+
+} // namespace impala
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-parquet-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-ir.cc 
b/be/src/exec/hdfs-parquet-scanner-ir.cc
index f2355d8..2ba56c4 100644
--- a/be/src/exec/hdfs-parquet-scanner-ir.cc
+++ b/be/src/exec/hdfs-parquet-scanner-ir.cc
@@ -65,17 +65,3 @@ int HdfsParquetScanner::ProcessScratchBatch(RowBatch* 
dst_batch) {
   scratch_batch_->tuple_idx += (scratch_tuple - scratch_tuple_start) / 
tuple_size;
   return output_row - output_row_start;
 }
-
-bool HdfsParquetScanner::EvalRuntimeFilter(int i, TupleRow* row) {
-  LocalFilterStats* stats = &filter_stats_[i];
-  const FilterContext* ctx = filter_ctxs_[i];
-  ++stats->total_possible;
-  if (stats->enabled && ctx->filter->HasFilter()) {
-    ++stats->considered;
-    if (!ctx->Eval(row)) {
-      ++stats->rejected;
-      return false;
-    }
-  }
-  return true;
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 0d79f53..ae22149 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -38,15 +38,6 @@ using std::move;
 using namespace impala;
 using namespace impala::io;
 
-DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the 
percentage of "
-    "rows rejected by a runtime filter drops below this value, the filter is 
disabled.");
-
-// The number of row batches between checks to see if a filter is effective, 
and
-// should be disabled. Must be a power of two.
-constexpr int BATCHES_PER_FILTER_SELECTIVITY_CHECK = 16;
-static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK),
-    "BATCHES_PER_FILTER_SELECTIVITY_CHECK must be a power of two");
-
 // Max dictionary page header size in bytes. This is an estimate and only 
needs to be an
 // upper bound.
 const int MAX_DICT_HEADER_SIZE = 100;
@@ -57,7 +48,6 @@ const int MAX_DICT_HEADER_SIZE = 100;
 // THIS RECORDS INFORMATION ABOUT PAST BEHAVIOR. DO NOT CHANGE THIS CONSTANT.
 const int LEGACY_IMPALA_MAX_DICT_ENTRIES = 40000;
 
-const int64_t HdfsParquetScanner::FOOTER_SIZE;
 const int16_t HdfsParquetScanner::ROW_GROUP_END;
 const int16_t HdfsParquetScanner::INVALID_LEVEL;
 const int16_t HdfsParquetScanner::INVALID_POS;
@@ -69,71 +59,14 @@ const string PARQUET_MEM_LIMIT_EXCEEDED =
 
 Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
     const vector<HdfsFileDesc*>& files) {
-  vector<ScanRange*> footer_ranges;
-  for (int i = 0; i < files.size(); ++i) {
+  for (HdfsFileDesc* file : files) {
     // If the file size is less than 12 bytes, it is an invalid Parquet file.
-    if (files[i]->file_length < 12) {
+    if (file->file_length < 12) {
       return Status(Substitute("Parquet file $0 has an invalid file length: 
$1",
-          files[i]->filename, files[i]->file_length));
-    }
-    // Compute the offset of the file footer.
-    int64_t footer_size = min(FOOTER_SIZE, files[i]->file_length);
-    int64_t footer_start = files[i]->file_length - footer_size;
-    DCHECK_GE(footer_start, 0);
-
-    // Try to find the split with the footer.
-    ScanRange* footer_split = FindFooterSplit(files[i]);
-
-    for (int j = 0; j < files[i]->splits.size(); ++j) {
-      ScanRange* split = files[i]->splits[j];
-
-      DCHECK_LE(split->offset() + split->len(), files[i]->file_length);
-      // If there are no materialized slots (such as count(*) over the table), 
we can
-      // get the result with the file metadata alone and don't need to read 
any row
-      // groups. We only want a single node to process the file footer in this 
case,
-      // which is the node with the footer split.  If it's not a count(*), we 
create a
-      // footer range for the split always.
-      if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
-        ScanRangeMetadata* split_metadata =
-            static_cast<ScanRangeMetadata*>(split->meta_data());
-        // Each split is processed by first issuing a scan range for the file 
footer, which
-        // is done here, followed by scan ranges for the columns of each row 
group within
-        // the actual split (in InitColumns()). The original split is stored 
in the
-        // metadata associated with the footer range.
-        ScanRange* footer_range;
-        if (footer_split != nullptr) {
-          footer_range = scan_node->AllocateScanRange(files[i]->fs,
-              files[i]->filename.c_str(), footer_size, footer_start,
-              split_metadata->partition_id, footer_split->disk_id(),
-              footer_split->expected_local(),
-              BufferOpts(footer_split->try_cache(), files[i]->mtime), split);
-        } else {
-          // If we did not find the last split, we know it is going to be a 
remote read.
-          footer_range =
-              scan_node->AllocateScanRange(files[i]->fs, 
files[i]->filename.c_str(),
-                  footer_size, footer_start, split_metadata->partition_id, -1, 
false,
-                  BufferOpts::Uncached(), split);
-        }
-
-        footer_ranges.push_back(footer_range);
-      } else {
-        scan_node->RangeComplete(THdfsFileFormat::PARQUET, 
THdfsCompression::NONE);
-      }
+          file->filename, file->file_length));
     }
   }
-  // The threads that process the footer will also do the scan, so we mark all 
the files
-  // as complete here.
-  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
-  return Status::OK();
-}
-
-ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
-  DCHECK(file != nullptr);
-  for (int i = 0; i < file->splits.size(); ++i) {
-    ScanRange* split = file->splits[i];
-    if (split->offset() + split->len() == file->file_length) return split;
-  }
-  return nullptr;
+  return IssueFooterRanges(scan_node, THdfsFileFormat::PARQUET, files);
 }
 
 namespace impala {
@@ -379,18 +312,6 @@ int HdfsParquetScanner::CountScalarColumns(const 
vector<ParquetColumnReader*>& c
   return num_columns;
 }
 
-void HdfsParquetScanner::CheckFiltersEffectiveness() {
-  for (int i = 0; i < filter_stats_.size(); ++i) {
-    LocalFilterStats* stats = &filter_stats_[i];
-    const RuntimeFilter* filter = filter_ctxs_[i]->filter;
-    double reject_ratio = stats->rejected / 
static_cast<double>(stats->considered);
-    if (filter->AlwaysTrue() ||
-        reject_ratio < FLAGS_parquet_min_filter_reject_ratio) {
-      stats->enabled = 0;
-    }
-  }
-}
-
 Status HdfsParquetScanner::ProcessSplit() {
   DCHECK(scan_node_->HasRowBatchQueue());
   HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_);
@@ -1126,104 +1047,6 @@ Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* 
node,
   return Status::OK();
 }
 
-bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* row) {
-  int num_filters = filter_ctxs_.size();
-  for (int i = 0; i < num_filters; ++i) {
-    if (!EvalRuntimeFilter(i, row)) return false;
-  }
-  return true;
-}
-
-// ; Function Attrs: noinline
-// define i1 @EvalRuntimeFilters(%"class.impala::HdfsParquetScanner"* %this,
-//                               %"class.impala::TupleRow"* %row) #34 {
-// entry:
-//   %0 = call i1 
@_ZN6impala18HdfsParquetScanner17EvalRuntimeFilterEiPNS_8TupleRowE.2(
-//       %"class.impala::HdfsParquetScanner"* %this, i32 0, 
%"class.impala::TupleRow"*
-//       %row)
-//   br i1 %0, label %continue, label %bail_out
-//
-// bail_out:                                         ; preds = %entry
-//   ret i1 false
-//
-// continue:                                         ; preds = %entry
-//   ret i1 true
-// }
-//
-// EvalRuntimeFilter() is the same as the cross-compiled version except 
EvalOneFilter()
-// is replaced with the one generated by CodegenEvalOneFilter().
-Status HdfsParquetScanner::CodegenEvalRuntimeFilters(
-    LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, 
llvm::Function** fn) {
-  llvm::LLVMContext& context = codegen->context();
-  LlvmBuilder builder(context);
-
-  *fn = nullptr;
-  llvm::Type* this_type = codegen->GetStructPtrType<HdfsParquetScanner>();
-  llvm::PointerType* tuple_row_ptr_type = 
codegen->GetStructPtrType<TupleRow>();
-  LlvmCodeGen::FnPrototype prototype(codegen, "EvalRuntimeFilters",
-      codegen->bool_type());
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type));
-  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
-
-  llvm::Value* args[2];
-  llvm::Function* eval_runtime_filters_fn = 
prototype.GeneratePrototype(&builder, args);
-  llvm::Value* this_arg = args[0];
-  llvm::Value* row_arg = args[1];
-
-  int num_filters = filter_exprs.size();
-  if (num_filters == 0) {
-    builder.CreateRet(codegen->true_value());
-  } else {
-    // row_rejected_block: jump target for when a filter is evaluated to false.
-    llvm::BasicBlock* row_rejected_block =
-        llvm::BasicBlock::Create(context, "row_rejected", 
eval_runtime_filters_fn);
-
-    DCHECK_GT(num_filters, 0);
-    for (int i = 0; i < num_filters; ++i) {
-      llvm::Function* eval_runtime_filter_fn =
-          
codegen->GetFunction(IRFunction::PARQUET_SCANNER_EVAL_RUNTIME_FILTER, true);
-      DCHECK(eval_runtime_filter_fn != nullptr);
-
-      // Codegen function for inlining filter's expression evaluation and 
constant fold
-      // the type of the expression into the hashing function to avoid 
branches.
-      llvm::Function* eval_one_filter_fn;
-      DCHECK(filter_exprs[i] != nullptr);
-      RETURN_IF_ERROR(FilterContext::CodegenEval(codegen, filter_exprs[i],
-          &eval_one_filter_fn));
-      DCHECK(eval_one_filter_fn != nullptr);
-
-      int replaced = codegen->ReplaceCallSites(eval_runtime_filter_fn, 
eval_one_filter_fn,
-          "FilterContext4Eval");
-      DCHECK_EQ(replaced, 1);
-
-      llvm::Value* idx = codegen->GetI32Constant(i);
-      llvm::Value* passed_filter = builder.CreateCall(
-          eval_runtime_filter_fn, llvm::ArrayRef<llvm::Value*>({this_arg, idx, 
row_arg}));
-
-      llvm::BasicBlock* continue_block =
-          llvm::BasicBlock::Create(context, "continue", 
eval_runtime_filters_fn);
-      builder.CreateCondBr(passed_filter, continue_block, row_rejected_block);
-      builder.SetInsertPoint(continue_block);
-    }
-    builder.CreateRet(codegen->true_value());
-
-    builder.SetInsertPoint(row_rejected_block);
-    builder.CreateRet(codegen->false_value());
-
-    // Don't inline this function to avoid code bloat in ProcessScratchBatch().
-    // If there is any filter, EvalRuntimeFilters() is large enough to not 
benefit
-    // much from inlining.
-    eval_runtime_filters_fn->addFnAttr(llvm::Attribute::NoInline);
-  }
-
-  *fn = codegen->FinalizeFunction(eval_runtime_filters_fn);
-  if (*fn == nullptr) {
-    return Status("Codegen'd HdfsParquetScanner::EvalRuntimeFilters() failed "
-        "verification, see log");
-  }
-  return Status::OK();
-}
-
 bool HdfsParquetScanner::AssembleCollection(
     const vector<ParquetColumnReader*>& column_readers, int 
new_collection_rep_level,
     CollectionValueBuilder* coll_value_builder) {

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index ccb109c..7fede3b 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -20,7 +20,6 @@
 #define IMPALA_EXEC_HDFS_PARQUET_SCANNER_H
 
 #include "codegen/impala-ir.h"
-#include "common/global-flags.h"
 #include "exec/hdfs-scanner.h"
 #include "exec/parquet-common.h"
 #include "exec/parquet-scratch-tuple-batch.h"
@@ -362,14 +361,6 @@ class HdfsParquetScanner : public HdfsScanner {
   friend class ScalarColumnReader;
   friend class BoolColumnReader;
 
-  /// Size of the file footer.  This is a guess.  If this value is too little, 
we will
-  /// need to issue another read.
-  static const int64_t FOOTER_SIZE = 1024 * 100;
-  static_assert(FOOTER_SIZE <= READ_SIZE_MIN_VALUE,
-      "FOOTER_SIZE can not be greater than READ_SIZE_MIN_VALUE.\n"
-      "You can increase FOOTER_SIZE if you want, "
-      "just don't forget to increase READ_SIZE_MIN_VALUE as well.");
-
   /// Index of the current row group being processed. Initialized to -1 which 
indicates
   /// that we have not started processing the first row group yet (GetNext() 
has not yet
   /// been called).
@@ -391,41 +382,11 @@ class HdfsParquetScanner : public HdfsScanner {
   /// the scanner. Stored in 'obj_pool_'.
   vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
 
-  /// Cached runtime filter contexts, one for each filter that applies to this 
column,
-  /// owned by instances of this class.
-  vector<const FilterContext*> filter_ctxs_;
-
-  struct LocalFilterStats {
-    /// Total number of rows to which each filter was applied
-    int64_t considered;
-
-    /// Total number of rows that each filter rejected.
-    int64_t rejected;
-
-    /// Total number of rows that each filter could have been applied to (if 
it were
-    /// available from row 0).
-    int64_t total_possible;
-
-    /// Use known-width type to act as logical boolean.  Set to 1 if 
corresponding filter
-    /// in filter_ctxs_ should be applied, 0 if it was ineffective and was 
disabled.
-    uint8_t enabled;
-
-    /// Padding to ensure structs do not straddle cache-line boundary.
-    uint8_t padding[7];
-
-    LocalFilterStats() : considered(0), rejected(0), total_possible(0), 
enabled(1) { }
-  };
-
   /// Pool used for allocating caches of definition/repetition levels and 
tuples for
   /// dictionary filtering. The definition/repetition levels are populated by 
the
   /// level readers. The pool is freed in Close().
   boost::scoped_ptr<MemPool> perm_pool_;
 
-  /// Track statistics of each filter (one for each filter in filter_ctxs_) 
per scanner so
-  /// that expensive aggregation up to the scan node can be performed once, 
during
-  /// Close().
-  vector<LocalFilterStats> filter_stats_;
-
   /// Number of scratch batches processed so far.
   int64_t row_batches_produced_;
 
@@ -511,10 +472,6 @@ class HdfsParquetScanner : public HdfsScanner {
   Status EvaluateStatsConjuncts(const parquet::FileMetaData& file_metadata,
       const parquet::RowGroup& row_group, bool* skip_row_group) 
WARN_UNUSED_RESULT;
 
-  /// Check runtime filters' effectiveness every 
BATCHES_PER_FILTER_SELECTIVITY_CHECK
-  /// row batches. Will update 'filter_stats_'.
-  void CheckFiltersEffectiveness();
-
   /// Advances 'row_group_idx_' to the next non-empty row group and initializes
   /// the column readers to scan it. Recoverable errors are logged to the 
runtime
   /// state. Only returns a non-OK status if a non-recoverable error is 
encountered
@@ -548,24 +505,6 @@ class HdfsParquetScanner : public HdfsScanner {
   /// materialized tuples. This is a separate function so it can be codegened.
   int ProcessScratchBatch(RowBatch* dst_batch);
 
-  /// Evaluates 'row' against the i-th runtime filter for this scan node and 
returns
-  /// true if 'row' finds a match in the filter. Returns false otherwise.
-  bool EvalRuntimeFilter(int i, TupleRow* row);
-
-  /// Evaluates runtime filters (if any) against the given row. Returns true if
-  /// they passed, false otherwise. Maintains the runtime filter stats, 
determines
-  /// whether the filters are effective, and disables them if they are not. 
This is
-  /// replaced by generated code at runtime.
-  bool EvalRuntimeFilters(TupleRow* row);
-
-  /// Codegen EvalRuntimeFilters() by unrolling the loop in the interpreted 
version
-  /// and emitting a customized version of EvalRuntimeFilter() for each filter 
in
-  /// 'filter_ctxs'. Return error status on failure. The generated function is 
returned
-  /// via 'fn'.
-  static Status CodegenEvalRuntimeFilters(LlvmCodeGen* codegen,
-      const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn)
-      WARN_UNUSED_RESULT;
-
   /// Reads data using 'column_readers' to materialize the tuples of a 
CollectionValue
   /// allocated from 'coll_value_builder'. Increases 
'coll_items_read_counter_' by the
   /// number of items in this collection and descendant collections.
@@ -592,10 +531,6 @@ class HdfsParquetScanner : public HdfsScanner {
   inline bool ReadCollectionItem(const std::vector<ParquetColumnReader*>& 
column_readers,
       bool materialize_tuple, MemPool* pool, Tuple* tuple) const;
 
-  /// Find and return the last split in the file if it is assigned to this 
scan node.
-  /// Returns NULL otherwise.
-  static io::ScanRange* FindFooterSplit(HdfsFileDesc* file);
-
   /// Process the file footer and parse file_metadata_.  This should be called 
with the
   /// last FOOTER_SIZE bytes in context_.
   Status ProcessFooter() WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index 80cb6c5..81642bf 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -23,6 +23,7 @@
 #include "exec/hdfs-rcfile-scanner.h"
 #include "exec/hdfs-avro-scanner.h"
 #include "exec/hdfs-parquet-scanner.h"
+#include "exec/hdfs-orc-scanner.h"
 
 #include <avro/errors.h>
 #include <avro/schema.h>
@@ -457,6 +458,8 @@ Status 
HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
   // Issue initial ranges for all file types.
   RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
       matching_per_type_files[THdfsFileFormat::PARQUET]));
+  RETURN_IF_ERROR(HdfsOrcScanner::IssueInitialRanges(this,
+      matching_per_type_files[THdfsFileFormat::ORC]));
   RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
       matching_per_type_files[THdfsFileFormat::TEXT]));
   RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
@@ -588,6 +591,9 @@ Status 
HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
     case THdfsFileFormat::PARQUET:
       scanner->reset(new HdfsParquetScanner(this, runtime_state_));
       break;
+    case THdfsFileFormat::ORC:
+      scanner->reset(new HdfsOrcScanner(this, runtime_state_));
+      break;
     default:
       return Status(Substitute("Unknown Hdfs file format type: $0",
           partition->file_format()));

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 7ea4d80..d143e91 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -46,6 +46,7 @@ Status HdfsScanNodeMt::Prepare(RuntimeState* state) {
   // because the scanner of the corresponding file format does implement 
GetNext().
   for (const auto& files: per_type_files_) {
     if (!files.second.empty() && files.first != THdfsFileFormat::PARQUET
+        && files.first != THdfsFileFormat::ORC
         && files.first != THdfsFileFormat::TEXT) {
       stringstream msg;
       msg << "Unsupported file format with HdfsScanNodeMt: " << files.first;

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc
index ec1d2a3..0d34356 100644
--- a/be/src/exec/hdfs-scanner-ir.cc
+++ b/be/src/exec/hdfs-scanner-ir.cc
@@ -95,6 +95,20 @@ void StringToDecimalSymbolDummy() {
   StringToDecimal16(nullptr, 0, 0, 0, false, nullptr);
 }
 
+bool HdfsScanner::EvalRuntimeFilter(int i, TupleRow* row) {
+  LocalFilterStats* stats = &filter_stats_[i];
+  const FilterContext* ctx = filter_ctxs_[i];
+  ++stats->total_possible;
+  if (stats->enabled && ctx->filter->HasFilter()) {
+    ++stats->considered;
+    if (!ctx->Eval(row)) {
+      ++stats->rejected;
+      return false;
+    }
+  }
+  return true;
+}
+
 // Define the string parsing functions for llvm.  Stamp out the templated 
functions
 #ifdef IR_COMPILE
 using ParseResult = StringParser::ParseResult;

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 1809fe5..a4aee4d 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -35,10 +35,15 @@
 #include "common/names.h"
 
 using namespace impala;
+using namespace impala::io;
 using namespace strings;
 
+DEFINE_double(min_filter_reject_ratio, 0.1, "(Advanced) If the percentage of "
+    "rows rejected by a runtime filter drops below this value, the filter is 
disabled.");
+
 const char* FieldLocation::LLVM_CLASS_NAME = "struct.impala::FieldLocation";
 const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner";
+const int64_t HdfsScanner::FOOTER_SIZE;
 
 HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
     : scan_node_(scan_node),
@@ -587,6 +592,96 @@ Status HdfsScanner::CodegenInitTuple(
   return Status::OK();
 }
 
+// ; Function Attrs: noinline
+// define i1 @EvalRuntimeFilters(%"class.impala::HdfsScanner"* %this,
+//                               %"class.impala::TupleRow"* %row) #34 {
+// entry:
+//   %0 = call i1 
@_ZN6impala11HdfsScanner17EvalRuntimeFilterEiPNS_8TupleRowE.2(
+//       %"class.impala::HdfsScanner"* %this, i32 0, %"class.impala::TupleRow"*
+//       %row)
+//   br i1 %0, label %continue, label %bail_out
+//
+// bail_out:                                         ; preds = %entry
+//   ret i1 false
+//
+// continue:                                         ; preds = %entry
+//   ret i1 true
+// }
+//
+// EvalRuntimeFilter() is the same as the cross-compiled version except 
EvalOneFilter()
+// is replaced with the one generated by CodegenEvalOneFilter().
+Status HdfsScanner::CodegenEvalRuntimeFilters(
+    LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, 
llvm::Function** fn) {
+  llvm::LLVMContext& context = codegen->context();
+  LlvmBuilder builder(context);
+
+  *fn = nullptr;
+  llvm::Type* this_type = codegen->GetStructPtrType<HdfsScanner>();
+  llvm::PointerType* tuple_row_ptr_type = 
codegen->GetStructPtrType<TupleRow>();
+  LlvmCodeGen::FnPrototype prototype(codegen, "EvalRuntimeFilters",
+                                     codegen->bool_type());
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("this", this_type));
+  prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
+
+  llvm::Value* args[2];
+  llvm::Function* eval_runtime_filters_fn = 
prototype.GeneratePrototype(&builder, args);
+  llvm::Value* this_arg = args[0];
+  llvm::Value* row_arg = args[1];
+
+  int num_filters = filter_exprs.size();
+  if (num_filters == 0) {
+    builder.CreateRet(codegen->true_value());
+  } else {
+    // row_rejected_block: jump target for when a filter is evaluated to false.
+    llvm::BasicBlock* row_rejected_block =
+        llvm::BasicBlock::Create(context, "row_rejected", 
eval_runtime_filters_fn);
+
+    DCHECK_GT(num_filters, 0);
+    for (int i = 0; i < num_filters; ++i) {
+      llvm::Function* eval_runtime_filter_fn =
+          codegen->GetFunction(IRFunction::HDFS_SCANNER_EVAL_RUNTIME_FILTER, 
true);
+      DCHECK(eval_runtime_filter_fn != nullptr);
+
+      // Codegen function for inlining filter's expression evaluation and 
constant fold
+      // the type of the expression into the hashing function to avoid 
branches.
+      llvm::Function* eval_one_filter_fn;
+      DCHECK(filter_exprs[i] != nullptr);
+      RETURN_IF_ERROR(FilterContext::CodegenEval(codegen, filter_exprs[i],
+          &eval_one_filter_fn));
+      DCHECK(eval_one_filter_fn != nullptr);
+
+      int replaced = codegen->ReplaceCallSites(eval_runtime_filter_fn, 
eval_one_filter_fn,
+          "FilterContext4Eval");
+      DCHECK_EQ(replaced, 1);
+
+      llvm::Value* idx = codegen->GetI32Constant(i);
+      llvm::Value* passed_filter = builder.CreateCall(
+          eval_runtime_filter_fn, llvm::ArrayRef<llvm::Value*>({this_arg, idx, 
row_arg}));
+
+      llvm::BasicBlock* continue_block =
+          llvm::BasicBlock::Create(context, "continue", 
eval_runtime_filters_fn);
+      builder.CreateCondBr(passed_filter, continue_block, row_rejected_block);
+      builder.SetInsertPoint(continue_block);
+    }
+    builder.CreateRet(codegen->true_value());
+
+    builder.SetInsertPoint(row_rejected_block);
+    builder.CreateRet(codegen->false_value());
+
+    // Don't inline this function to avoid code bloat in ProcessScratchBatch().
+    // If there is any filter, EvalRuntimeFilters() is large enough to not 
benefit
+    // much from inlining.
+    eval_runtime_filters_fn->addFnAttr(llvm::Attribute::NoInline);
+  }
+
+  *fn = codegen->FinalizeFunction(eval_runtime_filters_fn);
+  if (*fn == nullptr) {
+    return Status("Codegen'd HdfsScanner::EvalRuntimeFilters() failed "
+        "verification, see log");
+  }
+  return Status::OK();
+}
+
 Status HdfsScanner::UpdateDecompressor(const THdfsCompression::type& 
compression) {
   // Check whether the file in the stream has different compression from the 
last one.
   if (compression != decompression_type_) {
@@ -671,3 +766,87 @@ void HdfsScanner::ReportColumnParseError(const 
SlotDescriptor* desc,
     if (state_->abort_on_error() && parse_status_.ok()) parse_status_ = 
Status(ss.str());
   }
 }
+
+void HdfsScanner::CheckFiltersEffectiveness() {
+  for (int i = 0; i < filter_stats_.size(); ++i) {
+    LocalFilterStats* stats = &filter_stats_[i];
+    const RuntimeFilter* filter = filter_ctxs_[i]->filter;
+    double reject_ratio = stats->rejected / 
static_cast<double>(stats->considered);
+    if (filter->AlwaysTrue() ||
+        reject_ratio < FLAGS_min_filter_reject_ratio) {
+      stats->enabled = 0;
+    }
+  }
+}
+
+Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
+    const THdfsFileFormat::type& file_type, const vector<HdfsFileDesc*>& 
files) {
+  vector<ScanRange*> footer_ranges;
+  for (int i = 0; i < files.size(); ++i) {
+    // Compute the offset of the file footer.
+    int64_t footer_size = min(FOOTER_SIZE, files[i]->file_length);
+    int64_t footer_start = files[i]->file_length - footer_size;
+    DCHECK_GE(footer_start, 0);
+
+    // Try to find the split with the footer.
+    ScanRange* footer_split = FindFooterSplit(files[i]);
+
+    for (int j = 0; j < files[i]->splits.size(); ++j) {
+      ScanRange* split = files[i]->splits[j];
+
+      DCHECK_LE(split->offset() + split->len(), files[i]->file_length);
+      // If there are no materialized slots (such as count(*) over the table), 
we can
+      // get the result with the file metadata alone and don't need to read 
any row
+      // groups. We only want a single node to process the file footer in this 
case,
+      // which is the node with the footer split.  If it's not a count(*), we 
create a
+      // footer range for the split always.
+      if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
+        ScanRangeMetadata* split_metadata =
+            static_cast<ScanRangeMetadata*>(split->meta_data());
+        // Each split is processed by first issuing a scan range for the file 
footer, which
+        // is done here, followed by scan ranges for the columns of each row 
group within
+        // the actual split (in InitColumns()). The original split is stored 
in the
+        // metadata associated with the footer range.
+        ScanRange* footer_range;
+        if (footer_split != nullptr) {
+          footer_range = scan_node->AllocateScanRange(files[i]->fs,
+              files[i]->filename.c_str(), footer_size, footer_start,
+              split_metadata->partition_id, footer_split->disk_id(),
+              footer_split->expected_local(),
+              BufferOpts(footer_split->try_cache(), files[i]->mtime), split);
+        } else {
+          // If we did not find the last split, we know it is going to be a 
remote read.
+          footer_range =
+              scan_node->AllocateScanRange(files[i]->fs, 
files[i]->filename.c_str(),
+                   footer_size, footer_start, split_metadata->partition_id, 
-1, false,
+                   BufferOpts::Uncached(), split);
+        }
+
+        footer_ranges.push_back(footer_range);
+      } else {
+        scan_node->RangeComplete(file_type, THdfsCompression::NONE);
+      }
+    }
+  }
+  // The threads that process the footer will also do the scan, so we mark all 
the files
+  // as complete here.
+  RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
+  return Status::OK();
+}
+
+ScanRange* HdfsScanner::FindFooterSplit(HdfsFileDesc* file) {
+  DCHECK(file != nullptr);
+  for (int i = 0; i < file->splits.size(); ++i) {
+    ScanRange* split = file->splits[i];
+    if (split->offset() + split->len() == file->file_length) return split;
+  }
+  return nullptr;
+}
+
+bool HdfsScanner::EvalRuntimeFilters(TupleRow* row) {
+  int num_filters = filter_ctxs_.size();
+  for (int i = 0; i < num_filters; ++i) {
+    if (!EvalRuntimeFilter(i, row)) return false;
+  }
+  return true;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 6497457..73b8b70 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -25,6 +25,7 @@
 #include <boost/scoped_ptr.hpp>
 
 #include "codegen/impala-ir.h"
+#include "common/global-flags.h"
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "exec/hdfs-scan-node-base.h"
@@ -44,6 +45,12 @@ class TextConverter;
 class TupleDescriptor;
 class SlotDescriptor;
 
+// The number of row batches between checks to see if a filter is effective, 
and
+// should be disabled. Must be a power of two.
+constexpr int BATCHES_PER_FILTER_SELECTIVITY_CHECK = 16;
+static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK),
+              "BATCHES_PER_FILTER_SELECTIVITY_CHECK must be a power of two");
+
 /// Intermediate structure used for two pass parsing approach. In the first 
pass,
 /// the FieldLocation structs are filled out and contain where all the fields 
start and
 /// their lengths.  In the second pass, the FieldLocation is used to write out 
the
@@ -287,6 +294,67 @@ class HdfsScanner {
   /// Jitted write tuples function pointer.  Null if codegen is disabled.
   WriteTuplesFn write_tuples_fn_ = nullptr;
 
+  struct LocalFilterStats {
+    /// Total number of rows to which each filter was applied
+    int64_t considered;
+
+    /// Total number of rows that each filter rejected.
+    int64_t rejected;
+
+    /// Total number of rows that each filter could have been applied to (if 
it were
+    /// available from row 0).
+    int64_t total_possible;
+
+    /// Use known-width type to act as logical boolean.  Set to 1 if 
corresponding filter
+    /// in filter_ctxs_ should be applied, 0 if it was ineffective and was 
disabled.
+    uint8_t enabled;
+
+    /// Padding to ensure structs do not straddle cache-line boundary.
+    uint8_t padding[7];
+
+    LocalFilterStats() : considered(0), rejected(0), total_possible(0), 
enabled(1) { }
+  };
+
+  /// Cached runtime filter contexts, one for each filter that applies to this 
column.
+  vector<const FilterContext *> filter_ctxs_;
+
+  /// Track statistics of each filter (one for each filter in filter_ctxs_) 
per scanner
+  /// so that expensive aggregation up to the scan node can be performed once, 
during
+  /// Close().
+  vector<LocalFilterStats> filter_stats_;
+
+  /// Size of the file footer for ORC and Parquet. This is a guess. If this 
value is too
+  /// little, we will need to issue another read.
+  static const int64_t FOOTER_SIZE = 1024 * 100;
+  static_assert(FOOTER_SIZE <= READ_SIZE_MIN_VALUE,
+      "FOOTER_SIZE can not be greater than READ_SIZE_MIN_VALUE.\n"
+      "You can increase FOOTER_SIZE if you want, "
+      "just don't forget to increase READ_SIZE_MIN_VALUE as well.");
+
+  /// Check runtime filters' effectiveness every 
BATCHES_PER_FILTER_SELECTIVITY_CHECK
+  /// row batches. Will update 'filter_stats_'.
+  void CheckFiltersEffectiveness();
+
+  /// Evaluates 'row' against the i-th runtime filter for this scan node and 
returns
+  /// true if 'row' finds a match in the filter. Returns false otherwise.
+  bool EvalRuntimeFilter(int i, TupleRow* row);
+
+  /// Evaluates runtime filters (if any) against the given row. Returns true if
+  /// they passed, false otherwise. Maintains the runtime filter stats, 
determines
+  /// whether the filters are effective, and disables them if they are not. 
This is
+  /// replaced by generated code at runtime.
+  bool EvalRuntimeFilters(TupleRow* row);
+
+  /// Find and return the last split in the file if it is assigned to this 
scan node.
+  /// Returns NULL otherwise.
+  static io::ScanRange* FindFooterSplit(HdfsFileDesc* file);
+
+  /// Issue just the footer range for each file. This function is only used in 
parquet
+  /// and orc scanners. We'll then parse the footer and pick out the columns 
we want.
+  static Status IssueFooterRanges(HdfsScanNodeBase* scan_node,
+      const THdfsFileFormat::type& file_type, const 
std::vector<HdfsFileDesc*>& files)
+      WARN_UNUSED_RESULT;
+
   /// Implements GetNext(). Should be overridden by subclasses.
   /// Only valid to call if the parent scan node is multi-threaded.
   virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT {
@@ -420,6 +488,14 @@ class HdfsScanner {
   static Status CodegenInitTuple(
       const HdfsScanNodeBase* node, LlvmCodeGen* codegen, llvm::Function** 
init_tuple_fn);
 
+  /// Codegen EvalRuntimeFilters() by unrolling the loop in the interpreted 
version
+  /// and emitting a customized version of EvalRuntimeFilter() for each filter 
in
+  /// 'filter_ctxs'. Return error status on failure. The generated function is 
returned
+  /// via 'fn'.
+  static Status CodegenEvalRuntimeFilters(LlvmCodeGen* codegen,
+      const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn)
+      WARN_UNUSED_RESULT;
+
   /// Report parse error for column @ desc.   If abort_on_error is true, sets
   /// parse_status_ to the error message.
   void ReportColumnParseError(const SlotDescriptor* desc, const char* data, 
int len);

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index 0bbaa89..02e1ed8 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -27,6 +27,7 @@
 DECLARE_bool(load_catalog_in_background);
 DECLARE_bool(load_auth_to_local_rules);
 DECLARE_bool(enable_stats_extrapolation);
+DECLARE_bool(enable_orc_scanner);
 DECLARE_int32(non_impala_java_vlog);
 DECLARE_int32(num_metadata_loading_threads);
 DECLARE_int32(max_hdfs_partitions_parallel_load);
@@ -56,6 +57,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* 
cfg_bytes) {
   TBackendGflags cfg;
   cfg.__set_authorization_policy_file(FLAGS_authorization_policy_file);
   cfg.__set_load_catalog_in_background(FLAGS_load_catalog_in_background);
+  cfg.__set_enable_orc_scanner(FLAGS_enable_orc_scanner);
   cfg.__set_server_name(FLAGS_server_name);
   cfg.__set_sentry_config(FLAGS_sentry_config);
   cfg.__set_authorization_policy_provider_class(

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/bin/bootstrap_toolchain.py
----------------------------------------------------------------------
diff --git a/bin/bootstrap_toolchain.py b/bin/bootstrap_toolchain.py
index a09c905..f54bf04 100755
--- a/bin/bootstrap_toolchain.py
+++ b/bin/bootstrap_toolchain.py
@@ -429,7 +429,7 @@ if __name__ == "__main__":
   packages = map(Package, ["llvm", "kudu",
       "avro", "binutils", "boost", "breakpad", "bzip2", "cmake", "crcutil",
       "flatbuffers", "gcc", "gflags", "glog", "gperftools", "gtest", "libev",
-      "lz4", "openldap", "openssl", "protobuf",
+      "lz4", "openldap", "openssl", "orc", "protobuf",
       "rapidjson", "re2", "snappy", "thrift", "tpc-h", "tpc-ds", "zlib"])
   packages.insert(0, Package("llvm", "5.0.1-asserts"))
   bootstrap(toolchain_root, packages)

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index b2d9d15..f113e53 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -127,6 +127,8 @@ export IMPALA_OPENLDAP_VERSION=2.4.25
 unset IMPALA_OPENLDAP_URL
 export IMPALA_OPENSSL_VERSION=1.0.2l
 unset IMPALA_OPENSSL_URL
+export IMPALA_ORC_VERSION=1.4.3-p2
+unset IMPALA_ORC_URL
 export IMPALA_PROTOBUF_VERSION=2.6.1
 unset IMPALA_PROTOBUF_URL
 export IMPALA_POSTGRES_JDBC_DRIVER_VERSION=9.0-801

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/cmake_modules/FindOrc.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindOrc.cmake b/cmake_modules/FindOrc.cmake
new file mode 100644
index 0000000..ef06396
--- /dev/null
+++ b/cmake_modules/FindOrc.cmake
@@ -0,0 +1,55 @@
+##############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+##############################################################################
+
+# - Find Orc (headers and liborc.a) with ORC_ROOT hinting a location
+# This module defines
+#  ORC_INCLUDE_DIR, directory containing headers
+#  ORC_STATIC_LIB, path to liborc.a
+#  ORC_FOUND
+set(ORC_SEARCH_HEADER_PATHS
+  ${ORC_ROOT}/include
+  $ENV{IMPALA_HOME}/thirdparty/orc-$ENV{IMPALA_ORC_VERSION}/build/include)
+
+set(ORC_SEARCH_LIB_PATH
+  ${ORC_ROOT}/lib
+  $ENV{IMPALA_HOME}/thirdparty/orc-$ENV{IMPALA_ORC_VERSION}/build/lib)
+
+find_path(ORC_INCLUDE_DIR NAMES orc/OrcFile.hh OrcFile.hh PATHS
+  ${ORC_SEARCH_HEADER_PATHS}
+  # make sure we don't accidentally pick up a different version
+  NO_DEFAULT_PATH)
+
+find_library(ORC_STATIC_LIB NAMES liborc.a PATHS ${ORC_SEARCH_LIB_PATH})
+
+if(NOT ORC_STATIC_LIB)
+  message(FATAL_ERROR "ORC includes and libraries NOT found. "
+    "Looked for headers in ${ORC_SEARCH_HEADER_PATHS}, "
+    "and for libs in ${ORC_SEARCH_LIB_PATH}")
+  set(ORC_FOUND FALSE)
+else()
+  set(ORC_FOUND TRUE)
+endif ()
+
+set(ORC_FOUND ${ORC_STATIC_LIB_FOUND})
+
+mark_as_advanced(
+  ORC_INCLUDE_DIR
+  ORC_STATIC_LIB
+  ORC_FOUND
+)

http://git-wip-us.apache.org/repos/asf/impala/blob/23743baa/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 412ca06..c98f50a 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -73,4 +73,6 @@ struct TBackendGflags {
   23: required double max_filter_error_rate
 
   24: required i64 min_buffer_size
+
+  25: required bool enable_orc_scanner
 }

Reply via email to