http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index 8de7c63..a94359b 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -31,7 +31,7 @@
 #include "exec/parquet-column-readers.h"
 #include "exec/parquet-column-stats.h"
 #include "exec/scanner-context.inline.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/collection-value-builder.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime-state.h"
@@ -191,8 +191,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   num_dict_filtered_row_groups_counter_ =
       ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", 
TUnit::UNIT);
   process_footer_timer_stats_ =
-      ADD_SUMMARY_STATS_TIMER(
-          scan_node_->runtime_profile(), "FooterProcessingTime");
+      ADD_SUMMARY_STATS_TIMER(scan_node_->runtime_profile(), 
"FooterProcessingTime");
 
   codegend_process_scratch_batch_fn_ = reinterpret_cast<ProcessScratchBatchFn>(
       scan_node_->GetCodegenFn(THdfsFileFormat::PARQUET));
@@ -209,15 +208,16 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
   if (min_max_tuple_desc != nullptr) {
     int64_t tuple_size = min_max_tuple_desc->byte_size();
     if (!min_max_tuple_buffer_.TryAllocate(tuple_size)) {
-      return Status(Substitute("Could not allocate buffer of $0 bytes for 
Parquet "
-            "statistics tuple for file '$1'.", tuple_size, filename()));
+      string details = Substitute("Could not allocate buffer of $0 bytes for 
Parquet "
+          "statistics tuple for file '$1'.", tuple_size, filename());
+      return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, 
tuple_size);
     }
   }
 
   // Clone the min/max statistics conjuncts.
-  RETURN_IF_ERROR(Expr::CloneIfNotExists(scan_node_->min_max_conjunct_ctxs(),
-      state_, &min_max_conjuncts_ctxs_));
-  min_max_conjuncts_ctxs_to_eval_.reserve(min_max_conjuncts_ctxs_.size());
+  RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
+      expr_mem_pool_.get(), scan_node_->min_max_conjunct_evals(),
+      &min_max_conjunct_evals_));
 
   for (int i = 0; i < context->filter_ctxs().size(); ++i) {
     const FilterContext* ctx = &context->filter_ctxs()[i];
@@ -318,7 +318,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) {
 
   if (schema_resolver_.get() != nullptr) schema_resolver_.reset();
 
-  Expr::Close(min_max_conjuncts_ctxs_, state_);
+  ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_);
 
   for (int i = 0; i < filter_ctxs_.size(); ++i) {
     const FilterStats* stats = filter_ctxs_[i]->stats;
@@ -505,12 +505,10 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
   Tuple* min_max_tuple = 
reinterpret_cast<Tuple*>(min_max_tuple_buffer_.buffer());
   min_max_tuple->Init(tuple_size);
 
-  DCHECK_EQ(min_max_tuple_desc->slots().size(), 
min_max_conjuncts_ctxs_.size());
-
-  min_max_conjuncts_ctxs_to_eval_.clear();
-  for (int i = 0; i < min_max_conjuncts_ctxs_.size(); ++i) {
+  DCHECK_EQ(min_max_tuple_desc->slots().size(), 
min_max_conjunct_evals_.size());
+  for (int i = 0; i < min_max_conjunct_evals_.size(); ++i) {
     SlotDescriptor* slot_desc = min_max_tuple_desc->slots()[i];
-    ExprContext* conjunct = min_max_conjuncts_ctxs_[i];
+    ScalarExprEvaluator* eval = min_max_conjunct_evals_[i];
 
     // Resolve column path to determine col idx.
     SchemaNode* node = nullptr;
@@ -548,7 +546,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
     const ColumnType& col_type = slot_desc->type();
     bool stats_read = false;
     void* slot = min_max_tuple->GetSlot(slot_desc->tuple_offset());
-    const string& fn_name = conjunct->root()->function_name();
+    const string& fn_name = eval->root().function_name();
     if (fn_name == "lt" || fn_name == "le") {
       // We need to get min stats.
       stats_read = ColumnStatsBase::ReadFromThrift(
@@ -561,18 +559,15 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
       DCHECK(false) << "Unsupported function name for statistics evaluation: " 
<< fn_name;
     }
 
-    if (stats_read) min_max_conjuncts_ctxs_to_eval_.push_back(conjunct);
-  }
-
-  if (!min_max_conjuncts_ctxs_to_eval_.empty()) {
-    TupleRow row;
-    row.SetTuple(0, min_max_tuple);
-    if (!ExecNode::EvalConjuncts(&min_max_conjuncts_ctxs_to_eval_[0],
-          min_max_conjuncts_ctxs_to_eval_.size(), &row)) {
-      *skip_row_group = true;
+    if (stats_read) {
+      TupleRow row;
+      row.SetTuple(0, min_max_tuple);
+      if (!ExecNode::EvalPredicate(eval, &row)) {
+        *skip_row_group = true;
+        return Status::OK();
+      }
     }
   }
-
   return Status::OK();
 }
 
@@ -724,8 +719,8 @@ bool 
HdfsParquetScanner::IsDictFilterable(ParquetColumnReader* col_reader) {
   // rather than materializing the values.
   if (!slot_desc) return false;
   // Does this column reader have any dictionary filter conjuncts?
-  auto dict_filter_it = scanner_dict_filter_map_.find(slot_desc->id());
-  if (dict_filter_it == scanner_dict_filter_map_.end()) return false;
+  auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
+  if (dict_filter_it == dict_filter_map_.end()) return false;
 
   // Certain datatypes (chars, timestamps) do not have the appropriate value 
in the
   // file format and must be converted before return. This is true for the
@@ -875,9 +870,10 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const 
parquet::RowGroup& row_gr
         dictionary->num_entries() >= LEGACY_IMPALA_MAX_DICT_ENTRIES) continue;
 
     const SlotDescriptor* slot_desc = scalar_reader->slot_desc();
-    auto dict_filter_it = scanner_dict_filter_map_.find(slot_desc->id());
-    DCHECK(dict_filter_it != scanner_dict_filter_map_.end());
-    vector<ExprContext*>& dict_filter_conjunct_ctxs = dict_filter_it->second;
+    auto dict_filter_it = dict_filter_map_.find(slot_desc->id());
+    DCHECK(dict_filter_it != dict_filter_map_.end());
+    const vector<ScalarExprEvaluator*>& dict_filter_conjunct_evals =
+        dict_filter_it->second;
     void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset());
     bool column_has_match = false;
     for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
@@ -887,8 +883,8 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const 
parquet::RowGroup& row_gr
       // If any dictionary value passes the conjuncts, then move on to the 
next column.
       TupleRow row;
       row.SetTuple(0, dict_filter_tuple);
-      if (ExecNode::EvalConjuncts(dict_filter_conjunct_ctxs.data(),
-          dict_filter_conjunct_ctxs.size(), &row)) {
+      if (ExecNode::EvalConjuncts(dict_filter_conjunct_evals.data(),
+              dict_filter_conjunct_evals.size(), &row)) {
         column_has_match = true;
         break;
       }
@@ -998,8 +994,8 @@ Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, 
int num_rows) {
   // with parse_status_.
   RETURN_IF_ERROR(state_->GetQueryStatus());
   // Free local expr allocations for this thread
-  for (const auto& kv: scanner_conjuncts_map_) {
-    ExprContext::FreeLocalAllocations(kv.second);
+  for (const auto& kv: conjunct_evals_map_) {
+    ScalarExprEvaluator::FreeLocalAllocations(kv.second);
   }
   return Status::OK();
 }
@@ -1018,7 +1014,7 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* 
dst_batch) {
     // output batch per remaining scratch tuple and return. No need to evaluate
     // filters/conjuncts.
     DCHECK(filter_ctxs_.empty());
-    DCHECK(scanner_conjunct_ctxs_->empty());
+    DCHECK(conjunct_evals_->empty());
     int num_tuples = min(dst_batch->capacity() - dst_batch->num_rows(),
         scratch_batch_->num_tuples - scratch_batch_->tuple_idx);
     memset(output_row, 0, num_tuples * sizeof(Tuple*));
@@ -1040,8 +1036,7 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* 
dst_batch) {
 }
 
 Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
-    const vector<ExprContext*>& conjunct_ctxs, const vector<FilterContext>& 
filter_ctxs,
-    Function** process_scratch_batch_fn) {
+    const vector<ScalarExpr*>& conjuncts, Function** process_scratch_batch_fn) 
{
   DCHECK(node->runtime_state()->ShouldCodegen());
   *process_scratch_batch_fn = NULL;
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
@@ -1052,8 +1047,7 @@ Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
   DCHECK(fn != NULL);
 
   Function* eval_conjuncts_fn;
-  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjunct_ctxs,
-      &eval_conjuncts_fn));
+  RETURN_IF_ERROR(ExecNode::CodegenEvalConjuncts(codegen, conjuncts, 
&eval_conjuncts_fn));
   DCHECK(eval_conjuncts_fn != NULL);
 
   int replaced = codegen->ReplaceCallSites(fn, eval_conjuncts_fn, 
"EvalConjuncts");
@@ -1061,7 +1055,7 @@ Status HdfsParquetScanner::Codegen(HdfsScanNodeBase* node,
 
   Function* eval_runtime_filters_fn;
   RETURN_IF_ERROR(CodegenEvalRuntimeFilters(
-      codegen, filter_ctxs, &eval_runtime_filters_fn));
+      codegen, node->filter_exprs(), &eval_runtime_filters_fn));
   DCHECK(eval_runtime_filters_fn != NULL);
 
   replaced = codegen->ReplaceCallSites(fn, eval_runtime_filters_fn, 
"EvalRuntimeFilters");
@@ -1101,11 +1095,11 @@ bool HdfsParquetScanner::EvalRuntimeFilters(TupleRow* 
row) {
 // EvalRuntimeFilter() is the same as the cross-compiled version except 
EvalOneFilter()
 // is replaced with the one generated by CodegenEvalOneFilter().
 Status HdfsParquetScanner::CodegenEvalRuntimeFilters(LlvmCodeGen* codegen,
-    const vector<FilterContext>& filter_ctxs, Function** fn) {
+    const vector<ScalarExpr*>& filter_exprs, Function** fn) {
   LLVMContext& context = codegen->context();
   LlvmBuilder builder(context);
 
-  *fn = NULL;
+  *fn = nullptr;
   Type* this_type = codegen->GetPtrType(HdfsParquetScanner::LLVM_CLASS_NAME);
   PointerType* tuple_row_ptr_type = 
codegen->GetPtrType(TupleRow::LLVM_CLASS_NAME);
   LlvmCodeGen::FnPrototype prototype(codegen, "EvalRuntimeFilters",
@@ -1118,7 +1112,7 @@ Status 
HdfsParquetScanner::CodegenEvalRuntimeFilters(LlvmCodeGen* codegen,
   Value* this_arg = args[0];
   Value* row_arg = args[1];
 
-  int num_filters = filter_ctxs.size();
+  int num_filters = filter_exprs.size();
   if (num_filters == 0) {
     builder.CreateRet(codegen->true_value());
   } else {
@@ -1130,13 +1124,15 @@ Status 
HdfsParquetScanner::CodegenEvalRuntimeFilters(LlvmCodeGen* codegen,
     for (int i = 0; i < num_filters; ++i) {
       Function* eval_runtime_filter_fn =
           
codegen->GetFunction(IRFunction::PARQUET_SCANNER_EVAL_RUNTIME_FILTER, true);
-      DCHECK(eval_runtime_filter_fn != NULL);
+      DCHECK(eval_runtime_filter_fn != nullptr);
 
       // Codegen function for inlining filter's expression evaluation and 
constant fold
       // the type of the expression into the hashing function to avoid 
branches.
       Function* eval_one_filter_fn;
-      RETURN_IF_ERROR(filter_ctxs[i].CodegenEval(codegen, 
&eval_one_filter_fn));
-      DCHECK(eval_one_filter_fn != NULL);
+      DCHECK(filter_exprs[i] != nullptr);
+      RETURN_IF_ERROR(FilterContext::CodegenEval(codegen, filter_exprs[i],
+          &eval_one_filter_fn));
+      DCHECK(eval_one_filter_fn != nullptr);
 
       int replaced = codegen->ReplaceCallSites(eval_runtime_filter_fn, 
eval_one_filter_fn,
           "FilterContext4Eval");
@@ -1163,7 +1159,7 @@ Status 
HdfsParquetScanner::CodegenEvalRuntimeFilters(LlvmCodeGen* codegen,
   }
 
   *fn = codegen->FinalizeFunction(eval_runtime_filters_fn);
-  if (*fn == NULL) {
+  if (*fn == nullptr) {
     return Status("Codegen'd HdfsParquetScanner::EvalRuntimeFilters() failed "
         "verification, see log");
   }
@@ -1179,7 +1175,8 @@ bool HdfsParquetScanner::AssembleCollection(
 
   const TupleDescriptor* tuple_desc = &coll_value_builder->tuple_desc();
   Tuple* template_tuple = template_tuple_map_[tuple_desc];
-  const vector<ExprContext*> conjunct_ctxs = 
scanner_conjuncts_map_[tuple_desc->id()];
+  const vector<ScalarExprEvaluator*> evals =
+      conjunct_evals_map_[tuple_desc->id()];
 
   int64_t rows_read = 0;
   bool continue_execution = !scan_node_->ReachedLimit() && 
!context_->cancelled();
@@ -1227,7 +1224,7 @@ bool HdfsParquetScanner::AssembleCollection(
       end_of_collection = column_readers[0]->rep_level() <= 
new_collection_rep_level;
 
       if (materialize_tuple) {
-        if (ExecNode::EvalConjuncts(&conjunct_ctxs[0], conjunct_ctxs.size(), 
row)) {
+        if (ExecNode::EvalConjuncts(&evals[0], evals.size(), row)) {
           tuple = next_tuple(tuple_desc->byte_size(), tuple);
           ++num_to_commit;
         }
@@ -1345,8 +1342,9 @@ Status HdfsParquetScanner::ProcessFooter() {
     }
 
     if (!metadata_buffer.TryAllocate(metadata_size)) {
-      return Status(Substitute("Could not allocate buffer of $0 bytes for 
Parquet "
-          "metadata for file '$1'.", metadata_size, filename()));
+      string details = Substitute("Could not allocate buffer of $0 bytes for 
Parquet "
+          "metadata for file '$1'.", metadata_size, filename());
+      return scan_node_->mem_tracker()->MemLimitExceeded(state_, details, 
metadata_size);
     }
     metadata_ptr = metadata_buffer.buffer();
     DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h 
b/be/src/exec/hdfs-parquet-scanner.h
index 59a06bc..ad86dc8 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -337,8 +337,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Codegen ProcessScratchBatch(). Stores the resulting function in
   /// 'process_scratch_batch_fn' if codegen was successful or NULL otherwise.
   static Status Codegen(HdfsScanNodeBase* node,
-      const std::vector<ExprContext*>& conjunct_ctxs,
-      const std::vector<FilterContext>& filter_ctxs,
+      const std::vector<ScalarExpr*>& conjuncts,
       llvm::Function** process_scratch_batch_fn);
 
   /// The repetition level is set to this value to indicate the end of a row 
group.
@@ -379,13 +378,9 @@ class HdfsParquetScanner : public HdfsScanner {
   /// Buffer to back tuples when reading parquet::Statistics.
   ScopedBuffer min_max_tuple_buffer_;
 
-  /// Min/max statistics contexts, owned by HdfsScanner::state_->obj_pool_.
-  vector<ExprContext*> min_max_conjuncts_ctxs_;
-
-  /// Used in EvaluateRowGroupStats() to store non-owning copies of conjunct 
pointers from
-  /// 'min_max_conjunct_ctxs_'. It is declared here to avoid the dynamic 
allocation
-  /// overhead.
-  vector<ExprContext*> min_max_conjuncts_ctxs_to_eval_;
+  /// Clone of Min/max statistics conjunct evaluators. Has the same life time 
as
+  /// the scanner. Stored in 'obj_pool_'.
+  vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
 
   /// Cached runtime filter contexts, one for each filter that applies to this 
column,
   /// owned by instances of this class.
@@ -544,7 +539,7 @@ class HdfsParquetScanner : public HdfsScanner {
   /// 'filter_ctxs'. Return error status on failure. The generated function is 
returned
   /// via 'fn'.
   static Status CodegenEvalRuntimeFilters(LlvmCodeGen* codegen,
-      const std::vector<FilterContext>& filter_ctxs, llvm::Function** fn);
+      const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn);
 
   /// Reads data using 'column_readers' to materialize the tuples of a 
CollectionValue
   /// allocated from 'coll_value_builder'.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc 
b/be/src/exec/hdfs-parquet-table-writer.cc
index a3aa4d3..cc62ef8 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -20,8 +20,8 @@
 #include "common/version.h"
 #include "exec/hdfs-table-sink.h"
 #include "exec/parquet-column-stats.inline.h"
-#include "exprs/expr-context.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "rpc/thrift-util.h"
 #include "runtime/decimal-value.h"
 #include "runtime/mem-tracker.h"
@@ -88,10 +88,10 @@ namespace impala {
 class HdfsParquetTableWriter::BaseColumnWriter {
  public:
   // expr - the expression to generate output values for this column.
-  BaseColumnWriter(HdfsParquetTableWriter* parent, ExprContext* expr_ctx,
+  BaseColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* 
expr_eval,
       const THdfsCompression::type& codec)
     : parent_(parent),
-      expr_ctx_(expr_ctx),
+      expr_eval_(expr_eval),
       codec_(codec),
       page_size_(DEFAULT_DATA_PAGE_SIZE),
       current_page_(nullptr),
@@ -172,7 +172,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
     if (dict_encoder_base_ != nullptr) dict_encoder_base_->ClearIndices();
   }
 
-  const ColumnType& type() const { return expr_ctx_->root()->type(); }
+  const ColumnType& type() const { return expr_eval_->root().type(); }
   uint64_t num_values() const { return num_values_; }
   uint64_t total_compressed_size() const { return total_compressed_byte_size_; 
}
   uint64_t total_uncompressed_size() const { return 
total_uncompressed_byte_size_; }
@@ -222,7 +222,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   };
 
   HdfsParquetTableWriter* parent_;
-  ExprContext* expr_ctx_;
+  ScalarExprEvaluator* expr_eval_;
 
   THdfsCompression::type codec_;
 
@@ -285,13 +285,13 @@ template<typename T>
 class HdfsParquetTableWriter::ColumnWriter :
     public HdfsParquetTableWriter::BaseColumnWriter {
  public:
-  ColumnWriter(HdfsParquetTableWriter* parent, ExprContext* ctx,
+  ColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
       const THdfsCompression::type& codec)
-    : BaseColumnWriter(parent, ctx, codec),
+    : BaseColumnWriter(parent, eval, codec),
       num_values_since_dict_size_check_(0),
       plain_encoded_value_size_(
-          ParquetPlainEncoder::EncodedByteSize(ctx->root()->type())) {
-    DCHECK_NE(ctx->root()->type().type, TYPE_BOOLEAN);
+          ParquetPlainEncoder::EncodedByteSize(eval->root().type())) {
+    DCHECK_NE(eval->root().type().type, TYPE_BOOLEAN);
   }
 
   virtual void Reset() {
@@ -398,12 +398,12 @@ inline StringValue* 
HdfsParquetTableWriter::ColumnWriter<StringValue>::CastValue
 class HdfsParquetTableWriter::BoolColumnWriter :
     public HdfsParquetTableWriter::BaseColumnWriter {
  public:
-  BoolColumnWriter(HdfsParquetTableWriter* parent, ExprContext* ctx,
+  BoolColumnWriter(HdfsParquetTableWriter* parent, ScalarExprEvaluator* eval,
       const THdfsCompression::type& codec)
-    : BaseColumnWriter(parent, ctx, codec),
+    : BaseColumnWriter(parent, eval, codec),
       page_stats_(parent_->reusable_col_mem_pool_.get(), -1),
       row_group_stats_(parent_->reusable_col_mem_pool_.get(), -1) {
-    DCHECK_EQ(ctx->root()->type().type, TYPE_BOOLEAN);
+    DCHECK_EQ(eval->root().type().type, TYPE_BOOLEAN);
     bool_values_ = parent_->state_->obj_pool()->Add(
         new BitWriter(values_buffer_, values_buffer_len_));
     // Dictionary encoding doesn't make sense for bools and is not allowed by
@@ -449,7 +449,7 @@ class HdfsParquetTableWriter::BoolColumnWriter :
 
 inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* 
row) {
   ++num_values_;
-  void* value = expr_ctx_->GetValue(row);
+  void* value = expr_eval_->GetValue(row);
   if (current_page_ == nullptr) NewPage();
 
   // Ensure that we have enough space for the definition level, but don't 
write it yet in
@@ -731,8 +731,8 @@ void HdfsParquetTableWriter::BaseColumnWriter::NewPage() {
 
 HdfsParquetTableWriter::HdfsParquetTableWriter(HdfsTableSink* parent, 
RuntimeState* state,
     OutputPartition* output, const HdfsPartitionDescriptor* part_desc,
-    const HdfsTableDescriptor* table_desc, const vector<ExprContext*>& 
output_expr_ctxs)
-  : HdfsTableWriter(parent, state, output, part_desc, table_desc, 
output_expr_ctxs),
+    const HdfsTableDescriptor* table_desc)
+  : HdfsTableWriter(parent, state, output, part_desc, table_desc),
     thrift_serializer_(new ThriftSerializer(true)),
     current_row_group_(nullptr),
     row_count_(0),
@@ -774,59 +774,51 @@ Status HdfsParquetTableWriter::Init() {
   // Initialize each column structure.
   for (int i = 0; i < columns_.size(); ++i) {
     BaseColumnWriter* writer = nullptr;
-    const ColumnType& type = output_expr_ctxs_[i]->root()->type();
+    const ColumnType& type = output_expr_evals_[i]->root().type();
     switch (type.type) {
       case TYPE_BOOLEAN:
-        writer = new BoolColumnWriter(
-            this, output_expr_ctxs_[i], codec);
+        writer = new BoolColumnWriter(this, output_expr_evals_[i], codec);
         break;
       case TYPE_TINYINT:
-        writer = new ColumnWriter<int8_t>(
-            this, output_expr_ctxs_[i], codec);
+        writer = new ColumnWriter<int8_t>(this, output_expr_evals_[i], codec);
         break;
       case TYPE_SMALLINT:
-        writer = new ColumnWriter<int16_t>(
-            this, output_expr_ctxs_[i], codec);
+        writer = new ColumnWriter<int16_t>(this, output_expr_evals_[i], codec);
         break;
       case TYPE_INT:
-        writer = new ColumnWriter<int32_t>(
-            this, output_expr_ctxs_[i], codec);
+        writer = new ColumnWriter<int32_t>(this, output_expr_evals_[i], codec);
         break;
       case TYPE_BIGINT:
-        writer = new ColumnWriter<int64_t>(
-            this, output_expr_ctxs_[i], codec);
+        writer = new ColumnWriter<int64_t>(this, output_expr_evals_[i], codec);
         break;
       case TYPE_FLOAT:
-        writer = new ColumnWriter<float>(
-            this, output_expr_ctxs_[i], codec);
+        writer = new ColumnWriter<float>(this, output_expr_evals_[i], codec);
         break;
       case TYPE_DOUBLE:
-        writer = new ColumnWriter<double>(
-            this, output_expr_ctxs_[i], codec);
+        writer = new ColumnWriter<double>(this, output_expr_evals_[i], codec);
         break;
       case TYPE_TIMESTAMP:
         writer = new ColumnWriter<TimestampValue>(
-            this, output_expr_ctxs_[i], codec);
+            this, output_expr_evals_[i], codec);
         break;
       case TYPE_VARCHAR:
       case TYPE_STRING:
       case TYPE_CHAR:
-        writer = new ColumnWriter<StringValue>(
-            this, output_expr_ctxs_[i], codec);
+        writer = new ColumnWriter<StringValue>(this, output_expr_evals_[i], 
codec);
         break;
       case TYPE_DECIMAL:
-        switch (output_expr_ctxs_[i]->root()->type().GetByteSize()) {
+        switch (output_expr_evals_[i]->root().type().GetByteSize()) {
           case 4:
             writer = new ColumnWriter<Decimal4Value>(
-                this, output_expr_ctxs_[i], codec);
+                this, output_expr_evals_[i], codec);
             break;
           case 8:
             writer = new ColumnWriter<Decimal8Value>(
-                this, output_expr_ctxs_[i], codec);
+                this, output_expr_evals_[i], codec);
             break;
           case 16:
             writer = new ColumnWriter<Decimal16Value>(
-                this, output_expr_ctxs_[i], codec);
+                this, output_expr_evals_[i], codec);
             break;
           default:
             DCHECK(false);
@@ -852,10 +844,10 @@ Status HdfsParquetTableWriter::CreateSchema() {
 
   for (int i = 0; i < columns_.size(); ++i) {
     parquet::SchemaElement& node = file_metadata_.schema[i + 1];
+    const ColumnType& type = output_expr_evals_[i]->root().type();
     node.name = table_desc_->col_descs()[i + num_clustering_cols].name();
-    
node.__set_type(IMPALA_TO_PARQUET_TYPES[output_expr_ctxs_[i]->root()->type().type]);
+    node.__set_type(IMPALA_TO_PARQUET_TYPES[type.type]);
     node.__set_repetition_type(FieldRepetitionType::OPTIONAL);
-    const ColumnType& type = output_expr_ctxs_[i]->root()->type();
     if (type.type == TYPE_DECIMAL) {
       // This column is type decimal. Update the file metadata to include the
       // additional fields:
@@ -864,9 +856,9 @@ Status HdfsParquetTableWriter::CreateSchema() {
       //  3) precision/scale
       node.__set_converted_type(ConvertedType::DECIMAL);
       node.__set_type_length(
-          
ParquetPlainEncoder::DecimalSize(output_expr_ctxs_[i]->root()->type()));
-      node.__set_scale(output_expr_ctxs_[i]->root()->type().scale);
-      node.__set_precision(output_expr_ctxs_[i]->root()->type().precision);
+          
ParquetPlainEncoder::DecimalSize(output_expr_evals_[i]->root().type()));
+      node.__set_scale(output_expr_evals_[i]->root().type().scale);
+      node.__set_precision(output_expr_evals_[i]->root().type().precision);
     } else if (type.type == TYPE_VARCHAR || type.type == TYPE_CHAR ||
         (type.type == TYPE_STRING &&
          state_->query_options().parquet_annotate_strings_utf8)) {
@@ -887,7 +879,7 @@ Status HdfsParquetTableWriter::AddRowGroup() {
   current_row_group_->columns.resize(columns_.size());
   for (int i = 0; i < columns_.size(); ++i) {
     ColumnMetaData metadata;
-    metadata.type = 
IMPALA_TO_PARQUET_TYPES[columns_[i]->expr_ctx_->root()->type().type];
+    metadata.type = IMPALA_TO_PARQUET_TYPES[columns_[i]->type().type];
     metadata.path_in_schema.push_back(
         table_desc_->col_descs()[i + num_clustering_cols].name());
     metadata.codec = columns_[i]->codec();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.h 
b/be/src/exec/hdfs-parquet-table-writer.h
index 8cbd456..b3d319e 100644
--- a/be/src/exec/hdfs-parquet-table-writer.h
+++ b/be/src/exec/hdfs-parquet-table-writer.h
@@ -51,11 +51,9 @@ class TupleRow;
 
 class HdfsParquetTableWriter : public HdfsTableWriter {
  public:
-  HdfsParquetTableWriter(HdfsTableSink* parent,
-                         RuntimeState* state, OutputPartition* 
output_partition,
-                         const HdfsPartitionDescriptor* part_desc,
-                         const HdfsTableDescriptor* table_desc,
-                         const std::vector<ExprContext*>& output_expr_ctxs);
+  HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state,
+      OutputPartition* output_partition, const HdfsPartitionDescriptor* 
part_desc,
+      const HdfsTableDescriptor* table_desc);
 
   ~HdfsParquetTableWriter();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc 
b/be/src/exec/hdfs-rcfile-scanner.cc
index eb78fb9..33b62dd 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -23,7 +23,6 @@
 #include "exec/hdfs-sequence-scanner.h"
 #include "exec/scanner-context.inline.h"
 #include "exec/text-converter.inline.h"
-#include "exprs/expr.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime-state.h"
 #include "runtime/mem-pool.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index 0e7c319..eeb7cfe 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -33,7 +33,8 @@
 #include "codegen/llvm-codegen.h"
 #include "common/logging.h"
 #include "common/object-pool.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/descriptors.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/runtime-filter.inline.h"
@@ -80,14 +81,18 @@ const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC =
 const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
 
 HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
-                           const DescriptorTbl& descs)
+    const DescriptorTbl& descs)
     : ScanNode(pool, tnode, descs),
       min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
           tnode.hdfs_scan_node.min_max_tuple_id : -1),
       
skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
           tnode.hdfs_scan_node.skip_header_line_count : 0),
       tuple_id_(tnode.hdfs_scan_node.tuple_id),
-      disks_accessed_bitmap_(TUnit::UNIT, 0){
+      tuple_desc_(descs.GetTupleDescriptor(tuple_id_)),
+      thrift_dict_filter_conjuncts_map_(
+          tnode.hdfs_scan_node.__isset.dictionary_filter_conjuncts ?
+          &tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
+      disks_accessed_bitmap_(TUnit::UNIT, 0) {
 }
 
 HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -98,16 +103,21 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, 
RuntimeState* state) {
 
   // Add collection item conjuncts
   for (const auto& entry: tnode.hdfs_scan_node.collection_conjuncts) {
-    DCHECK(conjuncts_map_[entry.first].empty());
-    RETURN_IF_ERROR(
-        Expr::CreateExprTrees(pool_, entry.second, 
&conjuncts_map_[entry.first]));
+    TupleDescriptor* tuple_desc = 
state->desc_tbl().GetTupleDescriptor(entry.first);
+    RowDescriptor* collection_row_desc = state->obj_pool()->Add(
+        new RowDescriptor(tuple_desc, /* is_nullable */ false));
+    DCHECK(conjuncts_map_.find(entry.first) == conjuncts_map_.end());
+    RETURN_IF_ERROR(ScalarExpr::Create(entry.second, *collection_row_desc, 
state,
+        &conjuncts_map_[entry.first]));
   }
+  DCHECK(conjuncts_map_[tuple_id_].empty());
+  conjuncts_map_[tuple_id_] = conjuncts_;
 
   const TQueryOptions& query_options = state->query_options();
-  for (const TRuntimeFilterDesc& filter: tnode.runtime_filters) {
-    auto it = filter.planid_to_target_ndx.find(tnode.node_id);
-    DCHECK(it != filter.planid_to_target_ndx.end());
-    const TRuntimeFilterTargetDesc& target = filter.targets[it->second];
+  for (const TRuntimeFilterDesc& filter_desc : tnode.runtime_filters) {
+    auto it = filter_desc.planid_to_target_ndx.find(tnode.node_id);
+    DCHECK(it != filter_desc.planid_to_target_ndx.end());
+    const TRuntimeFilterTargetDesc& target = filter_desc.targets[it->second];
     if (state->query_options().runtime_filter_mode == 
TRuntimeFilterMode::LOCAL &&
         !target.is_local_target) {
       continue;
@@ -116,40 +126,32 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, 
RuntimeState* state) {
         !target.is_bound_by_partition_columns) {
       continue;
     }
-
-    FilterContext filter_ctx;
+    ScalarExpr* filter_expr;
     RETURN_IF_ERROR(
-        Expr::CreateExprTree(pool_, target.target_expr, &filter_ctx.expr_ctx));
-    filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, false);
-
-    string filter_profile_title = Substitute("Filter $0 ($1)", 
filter.filter_id,
+        ScalarExpr::Create(target.target_expr, row_desc(), state, 
&filter_expr));
+    filter_exprs_.push_back(filter_expr);
+
+    // TODO: Move this to Prepare()
+    filter_ctxs_.emplace_back();
+    FilterContext& filter_ctx = filter_ctxs_.back();
+    filter_ctx.filter = state->filter_bank()->RegisterFilter(filter_desc, 
false);
+    string filter_profile_title = Substitute("Filter $0 ($1)", 
filter_desc.filter_id,
         PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES));
     RuntimeProfile* profile = state->obj_pool()->Add(
         new RuntimeProfile(state->obj_pool(), filter_profile_title));
     runtime_profile_->AddChild(profile);
     filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile,
         target.is_bound_by_partition_columns));
-
-    filter_ctxs_.push_back(filter_ctx);
   }
 
-  // Add row batch conjuncts
-  DCHECK(conjuncts_map_[tuple_id_].empty());
-  conjuncts_map_[tuple_id_] = conjunct_ctxs_;
-
   // Add min max conjuncts
-  RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, 
tnode.hdfs_scan_node.min_max_conjuncts,
-      &min_max_conjunct_ctxs_));
-  DCHECK(min_max_conjunct_ctxs_.empty() == (min_max_tuple_id_ == -1));
-
-  for (const auto& entry: tnode.hdfs_scan_node.dictionary_filter_conjuncts) {
-    // Convert this slot's list of conjunct indices into a list of pointers
-    // into conjunct_ctxs_.
-    for (int conjunct_idx : entry.second) {
-      DCHECK_LT(conjunct_idx, conjunct_ctxs_.size());
-      ExprContext* conjunct_ctx = conjunct_ctxs_[conjunct_idx];
-      dict_filter_conjuncts_map_[entry.first].push_back(conjunct_ctx);
-    }
+  if (min_max_tuple_id_ != -1) {
+    min_max_tuple_desc_ = 
state->desc_tbl().GetTupleDescriptor(min_max_tuple_id_);
+    DCHECK(min_max_tuple_desc_ != nullptr);
+    RowDescriptor* min_max_row_desc = state->obj_pool()->Add(
+        new RowDescriptor(min_max_tuple_desc_, /* is_nullable */ false));
+    RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
+        *min_max_row_desc, state, &min_max_conjuncts_));
   }
 
   return Status::OK();
@@ -161,41 +163,37 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   runtime_state_ = state;
   RETURN_IF_ERROR(ScanNode::Prepare(state));
 
-  tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
-  DCHECK(tuple_desc_ != NULL);
-
   // Prepare collection conjuncts
   for (const auto& entry: conjuncts_map_) {
     TupleDescriptor* tuple_desc = 
state->desc_tbl().GetTupleDescriptor(entry.first);
     // conjuncts_ are already prepared in ExecNode::Prepare(), don't try to 
prepare again
-    if (tuple_desc == tuple_desc_) continue;
-    RowDescriptor* collection_row_desc =
-        state->obj_pool()->Add(new RowDescriptor(tuple_desc, /* is_nullable */ 
false));
-    RETURN_IF_ERROR(
-        Expr::Prepare(entry.second, state, *collection_row_desc, 
expr_mem_tracker()));
+    if (tuple_desc == tuple_desc_) {
+      conjunct_evals_map_[entry.first] = conjunct_evals();
+    } else {
+      DCHECK(conjunct_evals_map_[entry.first].empty());
+      RETURN_IF_ERROR(ScalarExprEvaluator::Create(entry.second, state, pool_,
+          expr_mem_pool(), &conjunct_evals_map_[entry.first]));
+    }
+  }
+
+  DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size());
+  for (int i = 0; i < filter_exprs_.size(); ++i) {
+    RETURN_IF_ERROR(ScalarExprEvaluator::Create(*filter_exprs_[i], state, 
pool_,
+        expr_mem_pool(), &filter_ctxs_[i].expr_eval));
+    AddEvaluatorToFree(filter_ctxs_[i].expr_eval);
   }
 
   // Prepare min max statistics conjuncts.
   if (min_max_tuple_id_ != -1) {
-    min_max_tuple_desc_ = 
state->desc_tbl().GetTupleDescriptor(min_max_tuple_id_);
-    DCHECK(min_max_tuple_desc_ != NULL);
-    RowDescriptor* min_max_row_desc =
-        state->obj_pool()->Add(new RowDescriptor(min_max_tuple_desc_, /* 
is_nullable */
-        false));
-    RETURN_IF_ERROR(Expr::Prepare(min_max_conjunct_ctxs_, state, 
*min_max_row_desc,
-        expr_mem_tracker()));
+    RETURN_IF_ERROR(ScalarExprEvaluator::Create(min_max_conjuncts_, state, 
pool_,
+        expr_mem_pool(), &min_max_conjunct_evals_));
   }
 
-  // One-time initialisation of state that is constant across scan ranges
+  // One-time initialization of state that is constant across scan ranges
   DCHECK(tuple_desc_->table_desc() != NULL);
   hdfs_table_ = static_cast<const 
HdfsTableDescriptor*>(tuple_desc_->table_desc());
   scan_node_pool_.reset(new MemPool(mem_tracker()));
 
-  for (FilterContext& filter: filter_ctxs_) {
-    RETURN_IF_ERROR(filter.expr_ctx->Prepare(state, row_desc(), 
expr_mem_tracker()));
-    AddExprCtxToFree(filter.expr_ctx);
-  }
-
   // Parse Avro table schema if applicable
   const string& avro_schema_str = hdfs_table_->avro_schema();
   if (!avro_schema_str.empty()) {
@@ -331,16 +329,16 @@ void HdfsScanNodeBase::Codegen(RuntimeState* state) {
     Status status;
     switch (format) {
       case THdfsFileFormat::TEXT:
-        status = HdfsTextScanner::Codegen(this, conjunct_ctxs_, &fn);
+        status = HdfsTextScanner::Codegen(this, conjuncts_, &fn);
         break;
       case THdfsFileFormat::SEQUENCE_FILE:
-        status = HdfsSequenceScanner::Codegen(this, conjunct_ctxs_, &fn);
+        status = HdfsSequenceScanner::Codegen(this, conjuncts_, &fn);
         break;
       case THdfsFileFormat::AVRO:
-        status = HdfsAvroScanner::Codegen(this, conjunct_ctxs_, &fn);
+        status = HdfsAvroScanner::Codegen(this, conjuncts_, &fn);
         break;
       case THdfsFileFormat::PARQUET:
-        status = HdfsParquetScanner::Codegen(this, conjunct_ctxs_, 
filter_ctxs_, &fn);
+        status = HdfsParquetScanner::Codegen(this, conjuncts_, &fn);
         break;
       default:
         // No codegen for this format
@@ -363,16 +361,19 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Open(state));
 
   // Open collection conjuncts
-  for (const auto& entry: conjuncts_map_) {
+  for (auto& entry: conjunct_evals_map_) {
     // conjuncts_ are already opened in ExecNode::Open()
     if (entry.first == tuple_id_) continue;
-    RETURN_IF_ERROR(Expr::Open(entry.second, state));
+    RETURN_IF_ERROR(ScalarExprEvaluator::Open(entry.second, state));
   }
 
   // Open min max conjuncts
-  RETURN_IF_ERROR(Expr::Open(min_max_conjunct_ctxs_, state));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Open(min_max_conjunct_evals_, state));
 
-  for (FilterContext& filter: filter_ctxs_) 
RETURN_IF_ERROR(filter.expr_ctx->Open(state));
+  // Open Runtime filter expressions.
+  for (FilterContext& ctx : filter_ctxs_) {
+    RETURN_IF_ERROR(ctx.expr_eval->Open(state));
+  }
 
   // Create template tuples for all partitions.
   for (int64_t partition_id: partition_ids_) {
@@ -381,7 +382,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
                                    << " partition_id=" << partition_id
                                    << "\n" << 
PrintThrift(state->instance_ctx());
     partition_template_tuple_map_[partition_id] = InitTemplateTuple(
-        partition_desc->partition_key_value_ctxs(), scan_node_pool_.get(), 
state);
+        partition_desc->partition_key_value_evals(), scan_node_pool_.get(), 
state);
   }
 
   runtime_state_->io_mgr()->RegisterContext(&reader_context_, mem_tracker());
@@ -476,15 +477,22 @@ void HdfsScanNodeBase::Close(RuntimeState* state) {
   if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll();
 
   // Close collection conjuncts
-  for (const auto& tid_conjunct: conjuncts_map_) {
+  for (auto& tid_conjunct: conjuncts_map_) {
     // conjuncts_ are already closed in ExecNode::Close()
     if (tid_conjunct.first == tuple_id_) continue;
-    Expr::Close(tid_conjunct.second, state);
+    ScalarExprEvaluator::Close(conjunct_evals_map_[tid_conjunct.first], state);
+    ScalarExpr::Close(tid_conjunct.second);
   }
 
-  Expr::Close(min_max_conjunct_ctxs_, state);
+  // Close min max conjunct
+  ScalarExprEvaluator::Close(min_max_conjunct_evals_, state);
+  ScalarExpr::Close(min_max_conjuncts_);
 
-  for (auto& filter_ctx: filter_ctxs_) filter_ctx.expr_ctx->Close(state);
+  // Close filter
+  for (auto& filter_ctx : filter_ctxs_) {
+    if (filter_ctx.expr_eval != nullptr) filter_ctx.expr_eval->Close(state);
+  }
+  ScalarExpr::Close(filter_exprs_);
   ScanNode::Close(state);
 }
 
@@ -529,8 +537,9 @@ Status 
HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
   return Status::OK();
 }
 
-bool HdfsScanNodeBase::FilePassesFilterPredicates(const vector<FilterContext>& 
filter_ctxs,
-    const THdfsFileFormat::type& format, HdfsFileDesc* file) {
+bool HdfsScanNodeBase::FilePassesFilterPredicates(
+    const vector<FilterContext>& filter_ctxs, const THdfsFileFormat::type& 
format,
+    HdfsFileDesc* file) {
 #ifndef NDEBUG
   if (FLAGS_skip_file_runtime_filtering) return true;
 #endif
@@ -689,15 +698,15 @@ Status 
HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition
   return status;
 }
 
-Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ExprContext*>& 
value_ctxs,
+Tuple* HdfsScanNodeBase::InitTemplateTuple(const vector<ScalarExprEvaluator*>& 
evals,
     MemPool* pool, RuntimeState* state) const {
   if (partition_key_slots_.empty()) return NULL;
   Tuple* template_tuple = Tuple::Create(tuple_desc_->byte_size(), pool);
   for (int i = 0; i < partition_key_slots_.size(); ++i) {
     const SlotDescriptor* slot_desc = partition_key_slots_[i];
-    ExprContext* value_ctx = value_ctxs[slot_desc->col_pos()];
+    ScalarExprEvaluator* eval = evals[slot_desc->col_pos()];
     // Exprs guaranteed to be literals, so can safely be evaluated without a 
row.
-    RawValue::Write(value_ctx->GetValue(NULL), template_tuple, slot_desc, 
NULL);
+    RawValue::Write(eval->GetValue(NULL), template_tuple, slot_desc, NULL);
   }
   return template_tuple;
 }
@@ -775,7 +784,7 @@ void HdfsScanNodeBase::RangeComplete(const 
THdfsFileFormat::type& file_type,
 }
 
 void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) 
const {
-  const vector<ExprContext*>& conjuncts = ExecNode::conjunct_ctxs();
+  const vector<ScalarExpr*>& conjuncts = ExecNode::conjuncts();
   // Initialize all order to be conjuncts.size() (after the last conjunct)
   order->insert(order->begin(), materialized_slots().size(), conjuncts.size());
 
@@ -784,7 +793,7 @@ void 
HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const
   vector<SlotId> slot_ids;
   for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) {
     slot_ids.clear();
-    int num_slots = conjuncts[conjunct_idx]->root()->GetSlotIds(&slot_ids);
+    int num_slots = conjuncts[conjunct_idx]->GetSlotIds(&slot_ids);
     for (int j = 0; j < num_slots; ++j) {
       SlotDescriptor* slot_desc = desc_tbl.GetSlotDescriptor(slot_ids[j]);
       int slot_idx = GetMaterializedSlotIdx(slot_desc->col_path());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h 
b/be/src/exec/hdfs-scan-node-base.h
index abe0fc6..071f1b6 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -143,8 +143,8 @@ class HdfsScanNodeBase : public ScanNode {
 
   int min_max_tuple_id() const { return min_max_tuple_id_; }
 
-  const std::vector<ExprContext*> min_max_conjunct_ctxs() const {
-    return min_max_conjunct_ctxs_;
+  const std::vector<ScalarExprEvaluator*>& min_max_conjunct_evals() const {
+    return min_max_conjunct_evals_;
   }
 
   const TupleDescriptor* min_max_tuple_desc() const { return 
min_max_tuple_desc_; }
@@ -155,13 +155,13 @@ class HdfsScanNodeBase : public ScanNode {
   int skip_header_line_count() const { return skip_header_line_count_; }
   DiskIoRequestContext* reader_context() { return reader_context_; }
 
-  typedef std::map<TupleId, std::vector<ExprContext*>> ConjunctsMap;
-  const ConjunctsMap& conjuncts_map() const { return conjuncts_map_; }
+  typedef std::map<TupleId, std::vector<ScalarExprEvaluator*>> 
ConjunctEvaluatorsMap;
+  const ConjunctEvaluatorsMap& conjuncts_map() const { return 
conjunct_evals_map_; }
 
   /// Slot Id => Dictionary Filter eligible conjuncts for that slot
-  typedef std::map<SlotId, std::vector<ExprContext*>> DictFilterConjunctsMap;
-  const DictFilterConjunctsMap& dict_filter_conjuncts_map() const {
-    return dict_filter_conjuncts_map_;
+  typedef std::map<TSlotId, std::vector<int32_t>> TDictFilterConjunctsMap;
+  const TDictFilterConjunctsMap* thrift_dict_filter_conjuncts_map() const {
+    return thrift_dict_filter_conjuncts_map_;
   }
 
   RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length() {
@@ -227,7 +227,7 @@ class HdfsScanNodeBase : public ScanNode {
   /// Allocates and initializes a new template tuple allocated from pool with 
values
   /// from the partition columns for the current scan range, if any,
   /// Returns NULL if there are no partition keys slots.
-  Tuple* InitTemplateTuple(const std::vector<ExprContext*>& value_ctxs,
+  Tuple* InitTemplateTuple(const std::vector<ScalarExprEvaluator*>& 
value_evals,
       MemPool* pool, RuntimeState* state) const;
 
   /// Returns the file desc for 'filename'.  Returns NULL if filename is 
invalid.
@@ -293,6 +293,8 @@ class HdfsScanNodeBase : public ScanNode {
   bool PartitionPassesFilters(int32_t partition_id, const std::string& 
stats_name,
       const std::vector<FilterContext>& filter_ctxs);
 
+  const std::vector<ScalarExpr*>& filter_exprs() const { return filter_exprs_; 
}
+
   const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; 
}
 
  protected:
@@ -305,7 +307,8 @@ class HdfsScanNodeBase : public ScanNode {
   const int min_max_tuple_id_;
 
   /// Conjuncts to evaluate on parquet::Statistics.
-  vector<ExprContext*> min_max_conjunct_ctxs_;
+  vector<ScalarExpr*> min_max_conjuncts_;
+  vector<ScalarExprEvaluator*> min_max_conjunct_evals_;
 
   /// Descriptor for the tuple used to evaluate conjuncts on 
parquet::Statistics.
   TupleDescriptor* min_max_tuple_desc_ = nullptr;
@@ -353,10 +356,12 @@ class HdfsScanNodeBase : public ScanNode {
 
   /// Conjuncts for each materialized tuple (top-level row batch tuples and 
collection
   /// item tuples). Includes a copy of ExecNode.conjuncts_.
+  typedef std::map<TupleId, std::vector<ScalarExpr*>> ConjunctsMap;
   ConjunctsMap conjuncts_map_;
+  ConjunctEvaluatorsMap conjunct_evals_map_;
 
-  /// Dictionary filtering eligible conjuncts for each slot
-  DictFilterConjunctsMap dict_filter_conjuncts_map_;
+  /// Dictionary filtering eligible conjuncts for each slot.
+  const TDictFilterConjunctsMap* thrift_dict_filter_conjuncts_map_;
 
   /// Set to true when the initial scan ranges are issued to the IoMgr. This 
happens on
   /// the first call to GetNext(). The token manager, in a different thread, 
will read
@@ -374,9 +379,12 @@ class HdfsScanNodeBase : public ScanNode {
   typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap;
   PathToSlotIdxMap path_to_materialized_slot_idx_;
 
+  /// Expressions to evaluate the input rows for filtering against runtime 
filters.
+  std::vector<ScalarExpr*> filter_exprs_;
+
   /// List of contexts for expected runtime filters for this scan node. These 
contexts are
   /// cloned by individual scanners to be used in multi-threaded contexts, 
passed through
-  /// the per-scanner ScannerContext..
+  /// the per-scanner ScannerContext. Correspond to exprs in 'filter_exprs_'.
   std::vector<FilterContext> filter_ctxs_;
 
   /// is_materialized_col_[i] = <true i-th column should be materialized, 
false otherwise>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 576ec55..4cbf503 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -362,12 +362,15 @@ void HdfsScanNode::ScannerThread() {
   SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
 
   // Make thread-local copy of filter contexts to prune scan ranges, and to 
pass to the
-  // scanner for finer-grained filtering.
+  // scanner for finer-grained filtering. Use a thread-local MemPool for the 
filter
+  // contexts as the embedded expression evaluators may allocate from it and 
MemPool
+  // is not thread safe.
+  MemPool filter_mem_pool(expr_mem_tracker());
   vector<FilterContext> filter_ctxs;
   Status filter_status = Status::OK();
   for (auto& filter_ctx: filter_ctxs_) {
     FilterContext filter;
-    filter_status = filter.CloneFrom(filter_ctx, runtime_state_);
+    filter_status = filter.CloneFrom(filter_ctx, pool_, runtime_state_, 
&filter_mem_pool);
     if (!filter_status.ok()) break;
     filter_ctxs.push_back(filter);
   }
@@ -386,14 +389,7 @@ void HdfsScanNode::ScannerThread() {
           // Unlock before releasing the thread token to avoid deadlock in
           // ThreadTokenAvailableCb().
           l.unlock();
-          runtime_state_->resource_pool()->ReleaseThreadToken(false);
-          if (filter_status.ok()) {
-            for (auto& ctx: filter_ctxs) {
-              ctx.expr_ctx->FreeLocalAllocations();
-              ctx.expr_ctx->Close(runtime_state_);
-            }
-          }
-          return;
+          goto exit;
         }
       } else {
         // If this is the only scanner thread, it should keep running 
regardless
@@ -458,16 +454,17 @@ void HdfsScanNode::ScannerThread() {
       break;
     }
   }
+  COUNTER_ADD(&active_scanner_thread_counter_, -1);
 
+exit:
+  runtime_state_->resource_pool()->ReleaseThreadToken(false);
   if (filter_status.ok()) {
     for (auto& ctx: filter_ctxs) {
-      ctx.expr_ctx->FreeLocalAllocations();
-      ctx.expr_ctx->Close(runtime_state_);
+      ctx.expr_eval->FreeLocalAllocations();
+      ctx.expr_eval->Close(runtime_state_);
     }
   }
-
-  COUNTER_ADD(&active_scanner_thread_counter_, -1);
-  runtime_state_->resource_pool()->ReleaseThreadToken(false);
+  filter_mem_pool.FreeAll();
 }
 
 namespace {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc
index 5b474d1..96aad07 100644
--- a/be/src/exec/hdfs-scanner-ir.cc
+++ b/be/src/exec/hdfs-scanner-ir.cc
@@ -76,8 +76,8 @@ int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* 
tuple_row, int row_
   return tuples_returned;
 }
 
-ExprContext* HdfsScanner::GetConjunctCtx(int idx) const {
-  return (*scanner_conjunct_ctxs_)[idx];
+ScalarExprEvaluator* HdfsScanner::GetConjunctEval(int idx) const {
+  return (*conjunct_evals_)[idx];
 }
 
 // Define the string parsing functions for llvm.  Stamp out the templated 
functions

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index a529668..991c40f 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -29,7 +29,7 @@
 #include "exec/hdfs-scan-node-mt.h"
 #include "exec/read-write-util.h"
 #include "exec/text-converter.inline.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr.h"
 #include "runtime/collection-value-builder.h"
 #include "runtime/descriptors.h"
 #include "runtime/hdfs-fs-cache.h"
@@ -64,7 +64,8 @@ HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, 
RuntimeState* state)
       stream_(NULL),
       eos_(false),
       is_closed_(false),
-      scanner_conjunct_ctxs_(NULL),
+      expr_mem_pool_(new MemPool(scan_node->expr_mem_tracker())),
+      conjunct_evals_(NULL),
       template_tuple_pool_(new MemPool(scan_node->mem_tracker())),
       template_tuple_(NULL),
       tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
@@ -85,7 +86,7 @@ HdfsScanner::HdfsScanner()
       stream_(NULL),
       eos_(false),
       is_closed_(false),
-      scanner_conjunct_ctxs_(NULL),
+      conjunct_evals_(NULL),
       template_tuple_pool_(NULL),
       template_tuple_(NULL),
       tuple_byte_size_(-1),
@@ -107,25 +108,33 @@ Status HdfsScanner::Open(ScannerContext* context) {
   context_ = context;
   stream_ = context->GetStream();
 
-  // Clone the scan node's conjuncts map. The cloned contexts must be closed 
by the
+  // Clone the scan node's conjuncts map. The cloned evaluators must be closed 
by the
   // caller.
   for (const auto& entry: scan_node_->conjuncts_map()) {
-    RETURN_IF_ERROR(Expr::CloneIfNotExists(entry.second,
-        scan_node_->runtime_state(), &scanner_conjuncts_map_[entry.first]));
+    RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, 
scan_node_->runtime_state(),
+        expr_mem_pool_.get(), entry.second,
+        &conjunct_evals_map_[entry.first]));
   }
-  DCHECK(scanner_conjuncts_map_.find(scan_node_->tuple_desc()->id()) !=
-         scanner_conjuncts_map_.end());
-  scanner_conjunct_ctxs_ = 
&scanner_conjuncts_map_[scan_node_->tuple_desc()->id()];
-
-  // Clone the scan node's dictionary filtering conjuncts map.
-  for (const auto& entry: scan_node_->dict_filter_conjuncts_map()) {
-    RETURN_IF_ERROR(Expr::CloneIfNotExists(entry.second,
-        scan_node_->runtime_state(), &scanner_dict_filter_map_[entry.first]));
+  DCHECK(conjunct_evals_map_.find(scan_node_->tuple_desc()->id()) !=
+         conjunct_evals_map_.end());
+  conjunct_evals_ = &conjunct_evals_map_[scan_node_->tuple_desc()->id()];
+
+  // Set up the scan node's dictionary filtering conjuncts map.
+  if (scan_node_->thrift_dict_filter_conjuncts_map() != nullptr) {
+    for (auto& entry : *(scan_node_->thrift_dict_filter_conjuncts_map())) {
+      // Convert this slot's list of conjunct indices into a list of pointers
+      // into conjunct_evals_.
+      for (int conjunct_idx : entry.second) {
+        DCHECK_LT(conjunct_idx, conjunct_evals_->size());
+        DCHECK((*conjunct_evals_)[conjunct_idx] != nullptr);
+        
dict_filter_map_[entry.first].push_back((*conjunct_evals_)[conjunct_idx]);
+      }
+    }
   }
 
   // Initialize the template_tuple_.
   template_tuple_ = scan_node_->InitTemplateTuple(
-      context_->partition_descriptor()->partition_key_value_ctxs(),
+      context_->partition_descriptor()->partition_key_value_evals(),
       template_tuple_pool_.get(), state_);
   template_tuple_map_[scan_node_->tuple_desc()] = template_tuple_;
 
@@ -139,8 +148,10 @@ void HdfsScanner::Close(RowBatch* row_batch) {
     decompressor_->Close();
     decompressor_.reset();
   }
-  for (const auto& entry: scanner_conjuncts_map_) Expr::Close(entry.second, 
state_);
-  for (const auto& entry: scanner_dict_filter_map_) Expr::Close(entry.second, 
state_);
+  for (auto& entry : conjunct_evals_map_) {
+    ScalarExprEvaluator::Close(entry.second, state_);
+  }
+  expr_mem_pool_->FreeAll();
   obj_pool_.Clear();
   stream_ = NULL;
   context_->ClearStreams();
@@ -223,8 +234,8 @@ Status HdfsScanner::CommitRows(int num_rows, bool 
enqueue_if_full, RowBatch* row
   RETURN_IF_ERROR(state_->GetQueryStatus());
   // Free local expr allocations for this thread to avoid accumulating too much
   // memory from evaluating the scanner conjuncts.
-  for (const auto& entry: scanner_conjuncts_map_) {
-    ExprContext::FreeLocalAllocations(entry.second);
+  for (const auto& entry: conjunct_evals_map_) {
+    ScalarExprEvaluator::FreeLocalAllocations(entry.second);
   }
   return Status::OK();
 }
@@ -232,7 +243,7 @@ Status HdfsScanner::CommitRows(int num_rows, bool 
enqueue_if_full, RowBatch* row
 int HdfsScanner::WriteTemplateTuples(TupleRow* row, int num_tuples) {
   DCHECK_GE(num_tuples, 0);
   DCHECK_EQ(scan_node_->tuple_idx(), 0);
-  DCHECK_EQ(scanner_conjunct_ctxs_->size(), 0);
+  DCHECK_EQ(conjunct_evals_->size(), 0);
   if (num_tuples == 0 || template_tuple_ == NULL) return num_tuples;
 
   Tuple** row_tuple = reinterpret_cast<Tuple**>(row);
@@ -306,10 +317,10 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, 
FieldLocation* fields,
 //  %error_in_row2 = or i1 false, %slot_parse_error
 //  %3 = zext i1 %slot_parse_error to i8
 //  store i8 %3, i8* %slot_error_ptr
-//  %4 = call %"class.impala::ExprContext"* @GetConjunctCtx(
+//  %4 = call %"class.impala::ScalarExprEvaluator"* @GetConjunctCtx(
 //    %"class.impala::HdfsScanner"* %this, i32 0)
 //  %conjunct_eval = call i16 
@"impala::Operators::Eq_StringVal_StringValWrapper"(
-//    %"class.impala::ExprContext"* %4, %"class.impala::TupleRow"* %tuple_row)
+//    %"class.impala::ScalarExprEvaluator"* %4, %"class.impala::TupleRow"* 
%tuple_row)
 //  %5 = ashr i16 %conjunct_eval, 8
 //  %6 = trunc i16 %5 to i8
 //  %val = trunc i8 %6 to i1
@@ -324,7 +335,7 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, 
FieldLocation* fields,
 //   ret i1 false
 // }
 Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
-    LlvmCodeGen* codegen, const vector<ExprContext*>& conjunct_ctxs,
+    LlvmCodeGen* codegen, const vector<ScalarExpr*>& conjuncts,
     Function** write_complete_tuple_fn) {
   *write_complete_tuple_fn = NULL;
   SCOPED_TIMER(codegen->codegen_timer());
@@ -436,7 +447,7 @@ Status 
HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
   builder.CreateBr(parse_block);
 
   // Loop through all the conjuncts in order and materialize slots as 
necessary to
-  // evaluate the conjuncts (e.g. conjunct_ctxs[0] will have the slots it 
references
+  // evaluate the conjuncts (e.g. conjuncts[0] will have the slots it 
references
   // first).
   // materialized_order[slot_idx] represents the first conjunct which needs 
that slot.
   // Slots are only materialized if its order matches the current conjunct 
being
@@ -444,7 +455,7 @@ Status 
HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
   // needed and that at the end of the materialize loop, the conjunct has 
everything
   // it needs (either from this iteration or previous iterations).
   builder.SetInsertPoint(parse_block);
-  for (int conjunct_idx = 0; conjunct_idx <= conjunct_ctxs.size(); 
++conjunct_idx) {
+  for (int conjunct_idx = 0; conjunct_idx <= conjuncts.size(); ++conjunct_idx) 
{
     for (int slot_idx = 0; slot_idx < materialize_order.size(); ++slot_idx) {
       // If they don't match, it means either the slot has already been
       // materialized for a previous conjunct or will be materialized later for
@@ -452,7 +463,7 @@ Status 
HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
       // yet.
       if (materialize_order[slot_idx] != conjunct_idx) continue;
 
-      // Materialize slots[slot_idx] to evaluate conjunct_ctxs[conjunct_idx]
+      // Materialize slots[slot_idx] to evaluate conjuncts[conjunct_idx]
       // All slots[i] with materialized_order[i] < conjunct_idx have already 
been
       // materialized by prior iterations through the outer loop
 
@@ -500,7 +511,7 @@ Status 
HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
       builder.CreateStore(slot_error, error_ptr);
     }
 
-    if (conjunct_idx == conjunct_ctxs.size()) {
+    if (conjunct_idx == conjuncts.size()) {
       // In this branch, we've just materialized slots not referenced by any 
conjunct.
       // This slots are the last to get materialized.  If we are in this 
branch, the
       // tuple passed all conjuncts and should be added to the row batch.
@@ -508,12 +519,12 @@ Status 
HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
       builder.CreateStore(error_ret, error_in_row_arg);
       builder.CreateRet(codegen->true_value());
     } else {
-      // All slots for conjunct_ctxs[conjunct_idx] are materialized, evaluate 
the partial
+      // All slots for conjuncts[conjunct_idx] are materialized, evaluate the 
partial
       // tuple against that conjunct and start a new parse_block for the next 
conjunct
       parse_block = BasicBlock::Create(context, "parse", fn, eval_fail_block);
       Function* conjunct_fn;
       Status status =
-          conjunct_ctxs[conjunct_idx]->root()->GetCodegendComputeFn(codegen, 
&conjunct_fn);
+          conjuncts[conjunct_idx]->GetCodegendComputeFn(codegen, &conjunct_fn);
       if (!status.ok()) {
         stringstream ss;
         ss << "Failed to codegen conjunct: " << status.GetDetail();
@@ -526,12 +537,12 @@ Status 
HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
         codegen->SetNoInline(conjunct_fn);
       }
 
-      Function* get_ctx_fn =
-          codegen->GetFunction(IRFunction::HDFS_SCANNER_GET_CONJUNCT_CTX, 
false);
-      Value* ctx = builder.CreateCall(get_ctx_fn,
+      Function* get_eval_fn =
+          
codegen->GetFunction(IRFunction::HDFS_SCANNER_GET_CONJUNCT_EVALUATOR, false);
+      Value* eval = builder.CreateCall(get_eval_fn,
           ArrayRef<Value*>({this_arg, codegen->GetIntConstant(TYPE_INT, 
conjunct_idx)}));
 
-      Value* conjunct_args[] = {ctx, tuple_row_arg};
+      Value* conjunct_args[] = {eval, tuple_row_arg};
       CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(
           codegen, &builder, TYPE_BOOLEAN, conjunct_fn, conjunct_args, 
"conjunct_eval");
       builder.CreateCondBr(result.GetVal(), parse_block, eval_fail_block);
@@ -543,7 +554,7 @@ Status 
HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node,
   builder.SetInsertPoint(eval_fail_block);
   builder.CreateRet(codegen->false_value());
 
-  if (node->materialized_slots().size() + conjunct_ctxs.size()
+  if (node->materialized_slots().size() + conjuncts.size()
       > LlvmCodeGen::CODEGEN_INLINE_EXPR_BATCH_THRESHOLD) {
     codegen->SetNoInline(fn);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 035b41c..03ca624 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -206,16 +206,22 @@ class HdfsScanner {
   /// Starts as false and is set to true in Close().
   bool is_closed_;
 
-  /// Clones of the conjuncts ExprContexts in scan_node_->conjuncts_map(). 
Each scanner
-  /// has its own ExprContexts so the conjuncts can be safely evaluated in 
parallel.
-  HdfsScanNodeBase::ConjunctsMap scanner_conjuncts_map_;
+  /// MemPool used for expression evaluators in this scanner. Need to be local
+  /// to each scanner as MemPool is not thread safe.
+  boost::scoped_ptr<MemPool> expr_mem_pool_;
 
-  // Convenience reference to scanner_conjuncts_map_[scan_node_->tuple_idx()] 
for scanners
-  // that do not support nested types.
-  const std::vector<ExprContext*>* scanner_conjunct_ctxs_;
+  /// Clones of the conjuncts' evaluators in scan_node_->conjuncts_map().
+  /// Each scanner has its own ScalarExprEvaluators so the conjuncts can be 
safely
+  /// evaluated in parallel.
+  HdfsScanNodeBase::ConjunctEvaluatorsMap conjunct_evals_map_;
 
-  // Clones of the conjuncts ExprContexts in 
scan_node_->dict_filter_conjuncts_map().
-  HdfsScanNodeBase::DictFilterConjunctsMap scanner_dict_filter_map_;
+  // Convenience reference to conjuncts_evals_map_[scan_node_->tuple_idx()] for
+  // scanners that do not support nested types.
+  const std::vector<ScalarExprEvaluator*>* conjunct_evals_;
+
+  // Clones of the conjuncts' evaluators in 
scan_node_->dict_filter_conjuncts_map().
+  typedef std::map<SlotId, std::vector<ScalarExprEvaluator*>> 
DictFilterConjunctsMap;
+  DictFilterConjunctsMap dict_filter_map_;
 
   /// Holds memory for template tuples. The memory in this pool must remain 
valid as long
   /// as the row batches produced by this scanner. This typically means that 
the
@@ -359,12 +365,12 @@ class HdfsScanner {
     return Status::OK();
   }
 
-  /// Convenience function for evaluating conjuncts using this scanner's 
ExprContexts.
+  /// Convenience function for evaluating conjuncts using this scanner's 
ScalarExprEvaluators.
   /// This must always be inlined so we can correctly replace the call to
   /// ExecNode::EvalConjuncts() during codegen.
   bool IR_ALWAYS_INLINE EvalConjuncts(TupleRow* row)  {
-    return ExecNode::EvalConjuncts(&(*scanner_conjunct_ctxs_)[0],
-                                   scanner_conjunct_ctxs_->size(), row);
+    return ExecNode::EvalConjuncts(&(*conjunct_evals_)[0],
+        conjunct_evals_->size(), row);
   }
 
   /// Sets 'num_tuples' template tuples in the batch that 'row' points to. 
Assumes the
@@ -436,7 +442,7 @@ class HdfsScanner {
   /// to WriteCompleteTuple. Stores the resulting function in 
'write_complete_tuple_fn'
   /// if codegen was successful or NULL otherwise.
   static Status CodegenWriteCompleteTuple(HdfsScanNodeBase* node, LlvmCodeGen* 
codegen,
-      const std::vector<ExprContext*>& conjunct_ctxs,
+      const std::vector<ScalarExpr*>& conjuncts,
       llvm::Function** write_complete_tuple_fn);
 
   /// Codegen function to replace WriteAlignedTuples.  WriteAlignedTuples is 
cross
@@ -512,10 +518,10 @@ class HdfsScanner {
     return reinterpret_cast<TupleRow*>(mem + sizeof(Tuple*));
   }
 
-  /// Simple wrapper around scanner_conjunct_ctxs_. Used in the codegen'd 
version of
+  /// Simple wrapper around conjunct_evals_[idx]. Used in the codegen'd 
version of
   /// WriteCompleteTuple() because it's easier than writing IR to access
-  /// scanner_conjunct_ctxs_.
-  ExprContext* GetConjunctCtx(int idx) const;
+  /// conjunct_evals_.
+  ScalarExprEvaluator* GetConjunctEval(int idx) const;
 
   /// Unit test constructor
   HdfsScanner();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.cc 
b/be/src/exec/hdfs-sequence-scanner.cc
index 275f96b..355a554 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -53,13 +53,13 @@ HdfsSequenceScanner::~HdfsSequenceScanner() {
 
 // Codegen for materialized parsed data into tuples.
 Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node,
-    const vector<ExprContext*>& conjunct_ctxs, Function** 
write_aligned_tuples_fn) {
+    const vector<ScalarExpr*>& conjuncts, Function** write_aligned_tuples_fn) {
   *write_aligned_tuples_fn = NULL;
   DCHECK(node->runtime_state()->ShouldCodegen());
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
   DCHECK(codegen != NULL);
   Function* write_complete_tuple_fn;
-  RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs,
+  RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjuncts,
       &write_complete_tuple_fn));
   DCHECK(write_complete_tuple_fn != NULL);
   RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, 
write_complete_tuple_fn,
@@ -268,11 +268,11 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() {
         delimited_text_parser_->escape_char() == '\0');
     // last argument: seq always starts at record_location[0]
     tuples_returned = write_tuples_fn_(this, pool, tuple_row,
-        batch_->row_byte_size(), &field_locations_[0], num_to_process,
+        batch_->row_byte_size(), field_locations_.data(), num_to_process,
         max_added_tuples, scan_node_->materialized_slots().size(), 0);
   } else {
     tuples_returned = WriteAlignedTuples(pool, tuple_row,
-        batch_->row_byte_size(), &field_locations_[0], num_to_process,
+        batch_->row_byte_size(), field_locations_.data(), num_to_process,
         max_added_tuples, scan_node_->materialized_slots().size(), 0);
   }
 
@@ -321,17 +321,17 @@ Status HdfsSequenceScanner::ProcessRange() {
 
       RETURN_IF_ERROR(delimited_text_parser_->ParseFieldLocations(
           1, record_locations_[0].len, reinterpret_cast<char**>(&record_start),
-          &row_end_loc, &field_locations_[0], &num_tuples, &num_fields, 
&col_start));
+          &row_end_loc, field_locations_.data(), &num_tuples, &num_fields, 
&col_start));
       DCHECK(num_tuples == 1);
 
       uint8_t errors[num_fields];
       memset(errors, 0, num_fields);
 
-      add_row = WriteCompleteTuple(pool, &field_locations_[0], tuple_, 
tuple_row_mem,
+      add_row = WriteCompleteTuple(pool, field_locations_.data(), tuple_, 
tuple_row_mem,
           template_tuple_, &errors[0], &error_in_row);
 
       if (UNLIKELY(error_in_row)) {
-        ReportTupleParseError(&field_locations_[0], errors);
+        ReportTupleParseError(field_locations_.data(), errors);
         RETURN_IF_ERROR(parse_status_);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-scanner.h 
b/be/src/exec/hdfs-sequence-scanner.h
index 5d6074f..01d5a68 100644
--- a/be/src/exec/hdfs-sequence-scanner.h
+++ b/be/src/exec/hdfs-sequence-scanner.h
@@ -172,7 +172,7 @@ class HdfsSequenceScanner : public BaseSequenceScanner {
   /// Codegen WriteAlignedTuples(). Stores the resulting function in
   /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise.
   static Status Codegen(HdfsScanNodeBase* node,
-      const std::vector<ExprContext*>& conjunct_ctxs,
+      const std::vector<ScalarExpr*>& conjuncts,
       llvm::Function** write_aligned_tuples_fn);
 
  protected:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-sequence-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.cc 
b/be/src/exec/hdfs-sequence-table-writer.cc
index a05aa5d..37463f3 100644
--- a/be/src/exec/hdfs-sequence-table-writer.cc
+++ b/be/src/exec/hdfs-sequence-table-writer.cc
@@ -20,8 +20,8 @@
 #include "exec/exec-node.h"
 #include "util/hdfs-util.h"
 #include "util/uid-util.h"
-#include "exprs/expr.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
 #include "runtime/row-batch.h"
@@ -44,13 +44,11 @@ const char* HdfsSequenceTableWriter::KEY_CLASS_NAME =
     "org.apache.hadoop.io.BytesWritable";
 
 HdfsSequenceTableWriter::HdfsSequenceTableWriter(HdfsTableSink* parent,
-                        RuntimeState* state, OutputPartition* output,
-                        const HdfsPartitionDescriptor* partition,
-                        const HdfsTableDescriptor* table_desc,
-                        const vector<ExprContext*>& output_exprs)
-    : HdfsTableWriter(parent, state, output, partition, table_desc, 
output_exprs),
-      mem_pool_(new MemPool(parent->mem_tracker())), compress_flag_(false),
-      unflushed_rows_(0), record_compression_(false) {
+    RuntimeState* state, OutputPartition* output,
+    const HdfsPartitionDescriptor* partition, const HdfsTableDescriptor* 
table_desc)
+  : HdfsTableWriter(parent, state, output, partition, table_desc),
+    mem_pool_(new MemPool(parent->mem_tracker())), compress_flag_(false),
+    unflushed_rows_(0), record_compression_(false) {
   approx_block_size_ = 64 * 1024 * 1024;
   parent->mem_tracker()->Consume(approx_block_size_);
   field_delim_ = partition->field_delim();
@@ -106,7 +104,8 @@ Status HdfsSequenceTableWriter::AppendRows(
   bool all_rows = row_group_indices.empty();
   int num_non_partition_cols =
       table_desc_->num_cols() - table_desc_->num_clustering_cols();
-  DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << 
parent_->DebugString();
+  DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols)
+      << parent_->DebugString();
 
   {
     SCOPED_TIMER(parent_->encode_timer());
@@ -241,15 +240,16 @@ void HdfsSequenceTableWriter::EncodeRow(TupleRow* row, 
WriteStream* buf) {
   // TODO Unify with text table writer
   int num_non_partition_cols =
       table_desc_->num_cols() - table_desc_->num_clustering_cols();
-  DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << 
parent_->DebugString();
+  DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols)
+      << parent_->DebugString();
   for (int j = 0; j < num_non_partition_cols; ++j) {
-    void* value = output_expr_ctxs_[j]->GetValue(row);
+    void* value = output_expr_evals_[j]->GetValue(row);
     if (value != NULL) {
-      if (output_expr_ctxs_[j]->root()->type().type == TYPE_STRING) {
+      if (output_expr_evals_[j]->root().type().type == TYPE_STRING) {
         WriteEscapedString(reinterpret_cast<const StringValue*>(value), 
&row_buf_);
       } else {
         string str;
-        output_expr_ctxs_[j]->PrintValue(value, &str);
+        output_expr_evals_[j]->PrintValue(value, &str);
         buf->WriteBytes(str.size(), str.data());
       }
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-sequence-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-sequence-table-writer.h 
b/be/src/exec/hdfs-sequence-table-writer.h
index 3d72926..f315920 100644
--- a/be/src/exec/hdfs-sequence-table-writer.h
+++ b/be/src/exec/hdfs-sequence-table-writer.h
@@ -100,11 +100,9 @@ struct OutputPartition;
 /// Output is buffered to fill sequence file blocks.
 class HdfsSequenceTableWriter : public HdfsTableWriter {
  public:
-  HdfsSequenceTableWriter(HdfsTableSink* parent,
-                          RuntimeState* state, OutputPartition* output,
-                          const HdfsPartitionDescriptor* partition,
-                          const HdfsTableDescriptor* table_desc,
-                          const std::vector<ExprContext*>& output_exprs);
+  HdfsSequenceTableWriter(HdfsTableSink* parent, RuntimeState* state,
+      OutputPartition* output, const HdfsPartitionDescriptor* partition,
+      const HdfsTableDescriptor* table_desc);
 
   ~HdfsSequenceTableWriter() { }
 

Reply via email to