http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index b49451a..e38439b 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -24,8 +24,8 @@
 #include "exec/exec-node.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "util/hdfs-util.h"
-#include "exprs/expr.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
@@ -56,8 +56,7 @@ namespace impala {
 const static string& ROOT_PARTITION_KEY =
     g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
 
-HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc,
-    const vector<TExpr>& select_list_texprs, const TDataSink& tsink)
+HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc, const TDataSink& 
tsink)
   : DataSink(row_desc),
     table_desc_(nullptr),
     default_partition_(nullptr),
@@ -66,8 +65,6 @@ HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc,
         tsink.table_sink.hdfs_table_sink.__isset.skip_header_line_count ?
             tsink.table_sink.hdfs_table_sink.skip_header_line_count :
             0),
-    select_list_texprs_(select_list_texprs),
-    
partition_key_texprs_(tsink.table_sink.hdfs_table_sink.partition_key_exprs),
     overwrite_(tsink.table_sink.hdfs_table_sink.overwrite),
     input_is_clustered_(tsink.table_sink.hdfs_table_sink.input_is_clustered),
     sort_columns_(tsink.table_sink.hdfs_table_sink.sort_columns),
@@ -83,30 +80,12 @@ OutputPartition::OutputPartition()
     partition_descriptor(nullptr),
     block_size(0) {}
 
-Status HdfsTableSink::PrepareExprs(RuntimeState* state) {
-  // Prepare select list expressions.
-  // Disable codegen for these - they would be unused anyway.
-  // TODO: codegen table sink
-  RETURN_IF_ERROR(
-      Expr::Prepare(output_expr_ctxs_, state, row_desc_, 
expr_mem_tracker_.get()));
-  RETURN_IF_ERROR(
-      Expr::Prepare(partition_key_expr_ctxs_, state, row_desc_, 
expr_mem_tracker_.get()));
-
-  // Prepare partition key exprs and gather dynamic partition key exprs.
-  for (size_t i = 0; i < partition_key_expr_ctxs_.size(); ++i) {
-    // Remember non-constant partition key exprs for building hash table of 
Hdfs files.
-    if (!partition_key_expr_ctxs_[i]->root()->is_constant()) {
-      dynamic_partition_key_expr_ctxs_.push_back(partition_key_expr_ctxs_[i]);
-    }
-  }
-  // Sanity check.
-  DCHECK_LE(partition_key_expr_ctxs_.size(), table_desc_->num_cols())
-    << DebugString();
-  DCHECK_EQ(partition_key_expr_ctxs_.size(), 
table_desc_->num_clustering_cols())
-    << DebugString();
-  DCHECK_GE(output_expr_ctxs_.size(),
-      table_desc_->num_cols() - table_desc_->num_clustering_cols()) << 
DebugString();
-
+Status HdfsTableSink::Init(const vector<TExpr>& thrift_output_exprs,
+    const TDataSink& tsink, RuntimeState* state) {
+  RETURN_IF_ERROR(DataSink::Init(thrift_output_exprs, tsink, state));
+  DCHECK(tsink.__isset.table_sink);
+  
RETURN_IF_ERROR(ScalarExpr::Create(tsink.table_sink.hdfs_table_sink.partition_key_exprs,
+      row_desc_, state, &partition_key_exprs_));
   return Status::OK();
 }
 
@@ -114,6 +93,8 @@ Status HdfsTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracke
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   unique_id_str_ = PrintId(state->fragment_instance_id(), "-");
   SCOPED_TIMER(profile()->total_time_counter());
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(partition_key_exprs_, state,
+      state->obj_pool(), expr_mem_pool(), &partition_key_expr_evals_));
 
   // TODO: Consider a system-wide random number generator, initialised in a 
single place.
   ptime now = microsec_clock::local_time();
@@ -122,11 +103,6 @@ Status HdfsTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracke
   VLOG_QUERY << "Random seed: " << seed;
   srand(seed);
 
-  RETURN_IF_ERROR(Expr::CreateExprTrees(
-      state->obj_pool(), partition_key_texprs_, &partition_key_expr_ctxs_));
-  RETURN_IF_ERROR(Expr::CreateExprTrees(
-      state->obj_pool(), select_list_texprs_, &output_expr_ctxs_));
-
   // Resolve table id and set input tuple descriptor.
   table_desc_ = static_cast<const HdfsTableDescriptor*>(
       state->desc_tbl().GetTableDescriptor(table_id_));
@@ -140,16 +116,25 @@ Status HdfsTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracke
   staging_dir_ = Substitute("$0/_impala_insert_staging/$1", 
table_desc_->hdfs_base_dir(),
       PrintId(state->query_id(), "_"));
 
-  RETURN_IF_ERROR(PrepareExprs(state));
+  // Prepare partition key exprs and gather dynamic partition key exprs.
+  for (size_t i = 0; i < partition_key_expr_evals_.size(); ++i) {
+    // Remember non-constant partition key exprs for building hash table of 
Hdfs files.
+    if (!partition_key_expr_evals_[i]->root().is_constant()) {
+      
dynamic_partition_key_expr_evals_.push_back(partition_key_expr_evals_[i]);
+    }
+  }
+  // Sanity check.
+  DCHECK_LE(partition_key_expr_evals_.size(), table_desc_->num_cols())
+      << DebugString();
+  DCHECK_EQ(partition_key_expr_evals_.size(), 
table_desc_->num_clustering_cols())
+      << DebugString();
+  DCHECK_GE(output_expr_evals_.size(),
+      table_desc_->num_cols() - table_desc_->num_clustering_cols()) << 
DebugString();
 
-  partitions_created_counter_ =
-      ADD_COUNTER(profile(), "PartitionsCreated", TUnit::UNIT);
-  files_created_counter_ =
-      ADD_COUNTER(profile(), "FilesCreated", TUnit::UNIT);
-  rows_inserted_counter_ =
-      ADD_COUNTER(profile(), "RowsInserted", TUnit::UNIT);
-  bytes_written_counter_ =
-      ADD_COUNTER(profile(), "BytesWritten", TUnit::BYTES);
+  partitions_created_counter_ = ADD_COUNTER(profile(), "PartitionsCreated", 
TUnit::UNIT);
+  files_created_counter_ = ADD_COUNTER(profile(), "FilesCreated", TUnit::UNIT);
+  rows_inserted_counter_ = ADD_COUNTER(profile(), "RowsInserted", TUnit::UNIT);
+  bytes_written_counter_ = ADD_COUNTER(profile(), "BytesWritten", 
TUnit::BYTES);
   encode_timer_ = ADD_TIMER(profile(), "EncodeTimer");
   hdfs_write_timer_ = ADD_TIMER(profile(), "HdfsWriteTimer");
   compress_timer_ = ADD_TIMER(profile(), "CompressTimer");
@@ -158,8 +143,9 @@ Status HdfsTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracke
 }
 
 Status HdfsTableSink::Open(RuntimeState* state) {
-  RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state));
-  RETURN_IF_ERROR(Expr::Open(partition_key_expr_ctxs_, state));
+  RETURN_IF_ERROR(DataSink::Open(state));
+  DCHECK_EQ(partition_key_exprs_.size(), partition_key_expr_evals_.size());
+  RETURN_IF_ERROR(ScalarExprEvaluator::Open(partition_key_expr_evals_, state));
 
   // Get file format for default partition in table descriptor, and build a 
map from
   // partition key values to partition descriptor for multiple output format 
support. The
@@ -182,14 +168,15 @@ Status HdfsTableSink::Open(RuntimeState* state) {
       // Only relevant partitions are remembered in partition_descriptor_map_.
       bool relevant_partition = true;
       HdfsPartitionDescriptor* partition = id_to_desc.second;
-      DCHECK_EQ(partition->partition_key_value_ctxs().size(),
-                partition_key_expr_ctxs_.size());
-      vector<ExprContext*> dynamic_partition_key_value_ctxs;
-      for (size_t i = 0; i < partition_key_expr_ctxs_.size(); ++i) {
+      DCHECK_EQ(partition->partition_key_value_evals().size(),
+          partition_key_expr_evals_.size());
+      vector<ScalarExprEvaluator*> dynamic_partition_key_value_evals;
+      for (size_t i = 0; i < partition_key_expr_evals_.size(); ++i) {
         // Remember non-constant partition key exprs for building hash table 
of Hdfs files
-        if (!partition_key_expr_ctxs_[i]->root()->is_constant()) {
-          dynamic_partition_key_value_ctxs.push_back(
-              partition->partition_key_value_ctxs()[i]);
+        DCHECK(&partition_key_expr_evals_[i]->root() == 
partition_key_exprs_[i]);
+        if (!partition_key_exprs_[i]->is_constant()) {
+          dynamic_partition_key_value_evals.push_back(
+              partition->partition_key_value_evals()[i]);
         } else {
           // Deal with the following: one partition has (year=2009, month=3); 
another has
           // (year=2010, month=3).
@@ -198,9 +185,9 @@ Status HdfsTableSink::Open(RuntimeState* state) {
           // partition keys. So only keep a reference to the partition which 
matches
           // partition_key_values for constant values, since only that is 
written to.
           void* table_partition_key_value =
-              partition->partition_key_value_ctxs()[i]->GetValue(nullptr);
+              partition->partition_key_value_evals()[i]->GetValue(nullptr);
           void* target_partition_key_value =
-              partition_key_expr_ctxs_[i]->GetValue(nullptr);
+              partition_key_expr_evals_[i]->GetValue(nullptr);
           if (table_partition_key_value == nullptr
               && target_partition_key_value == nullptr) {
             continue;
@@ -208,7 +195,7 @@ Status HdfsTableSink::Open(RuntimeState* state) {
           if (table_partition_key_value == nullptr
               || target_partition_key_value == nullptr
               || !RawValue::Eq(table_partition_key_value, 
target_partition_key_value,
-                     partition_key_expr_ctxs_[i]->root()->type())) {
+                     partition_key_expr_evals_[i]->root().type())) {
             relevant_partition = false;
             break;
           }
@@ -218,7 +205,7 @@ Status HdfsTableSink::Open(RuntimeState* state) {
         string key;
         // Pass nullptr as row, since all of these expressions are constant, 
and can
         // therefore be evaluated without a valid row context.
-        GetHashTblKey(nullptr, dynamic_partition_key_value_ctxs, &key);
+        GetHashTblKey(nullptr, dynamic_partition_key_value_evals, &key);
         DCHECK(partition_descriptor_map_.find(key) == 
partition_descriptor_map_.end())
             << "Partitions with duplicate 'static' keys found during INSERT";
         partition_descriptor_map_[key] = partition;
@@ -291,14 +278,14 @@ Status HdfsTableSink::WriteRowsToPartition(
 
 Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* 
batch) {
   DCHECK_GT(batch->num_rows(), 0);
-  DCHECK(!dynamic_partition_key_expr_ctxs_.empty());
+  DCHECK(!dynamic_partition_key_expr_evals_.empty());
   DCHECK(input_is_clustered_);
 
   // Initialize the clustered partition and key.
   if (current_clustered_partition_ == nullptr) {
     const TupleRow* current_row = batch->GetRow(0);
-    GetHashTblKey(
-        current_row, dynamic_partition_key_expr_ctxs_, 
&current_clustered_partition_key_);
+    GetHashTblKey(current_row, dynamic_partition_key_expr_evals_,
+        &current_clustered_partition_key_);
     RETURN_IF_ERROR(GetOutputPartition(state, current_row,
         current_clustered_partition_key_, &current_clustered_partition_, 
false));
   }
@@ -306,8 +293,8 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* 
state, RowBatch* batc
   // Compare the last row of the batch to the last current partition key. If 
they match,
   // then all the rows in the batch have the same key and can be written as a 
whole.
   string last_row_key;
-  GetHashTblKey(batch->GetRow(batch->num_rows() - 1), 
dynamic_partition_key_expr_ctxs_,
-      &last_row_key);
+  GetHashTblKey(batch->GetRow(batch->num_rows() - 1),
+      dynamic_partition_key_expr_evals_, &last_row_key);
   if (last_row_key == current_clustered_partition_key_) {
     DCHECK(current_clustered_partition_->second.empty());
     RETURN_IF_ERROR(WriteRowsToPartition(state, batch, 
current_clustered_partition_));
@@ -320,7 +307,7 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* 
state, RowBatch* batc
     const TupleRow* current_row = batch->GetRow(i);
 
     string key;
-    GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, &key);
+    GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &key);
 
     if (current_clustered_partition_key_ != key) {
       DCHECK(current_clustered_partition_ != nullptr);
@@ -341,7 +328,7 @@ Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* 
state, RowBatch* batc
     }
 #ifdef DEBUG
     string debug_row_key;
-    GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, 
&debug_row_key);
+    GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, 
&debug_row_key);
     DCHECK_EQ(current_clustered_partition_key_, debug_row_key);
 #endif
     DCHECK(current_clustered_partition_ != nullptr);
@@ -435,15 +422,15 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* 
state,
   // Build the unique name for this partition from the partition keys, e.g. 
"j=1/f=foo/"
   // etc.
   stringstream partition_name_ss;
-  for (int j = 0; j < partition_key_expr_ctxs_.size(); ++j) {
+  for (int j = 0; j < partition_key_expr_evals_.size(); ++j) {
     partition_name_ss << table_desc_->col_descs()[j].name() << "=";
-    void* value = partition_key_expr_ctxs_[j]->GetValue(row);
+    void* value = partition_key_expr_evals_[j]->GetValue(row);
     // nullptr partition keys get a special value to be compatible with Hive.
     if (value == nullptr) {
       partition_name_ss << table_desc_->null_partition_key_value();
     } else {
       string value_str;
-      partition_key_expr_ctxs_[j]->PrintValue(value, &value_str);
+      partition_key_expr_evals_[j]->PrintValue(value, &value_str);
       // Directory names containing partition-key values need to be 
UrlEncoded, in
       // particular to avoid problems when '/' is part of the key value (which 
might
       // occur, for example, with date strings). Hive will URL decode the value
@@ -511,26 +498,22 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* 
state,
     case THdfsFileFormat::TEXT:
       output_partition->writer.reset(
           new HdfsTextTableWriter(
-              this, state, output_partition, &partition_descriptor, 
table_desc_,
-              output_expr_ctxs_));
+              this, state, output_partition, &partition_descriptor, 
table_desc_));
       break;
     case THdfsFileFormat::PARQUET:
       output_partition->writer.reset(
           new HdfsParquetTableWriter(
-              this, state, output_partition, &partition_descriptor, 
table_desc_,
-              output_expr_ctxs_));
+              this, state, output_partition, &partition_descriptor, 
table_desc_));
       break;
     case THdfsFileFormat::SEQUENCE_FILE:
       output_partition->writer.reset(
           new HdfsSequenceTableWriter(
-              this, state, output_partition, &partition_descriptor, 
table_desc_,
-              output_expr_ctxs_));
+              this, state, output_partition, &partition_descriptor, 
table_desc_));
       break;
     case THdfsFileFormat::AVRO:
       output_partition->writer.reset(
           new HdfsAvroTableWriter(
-              this, state, output_partition, &partition_descriptor, 
table_desc_,
-              output_expr_ctxs_));
+              this, state, output_partition, &partition_descriptor, 
table_desc_));
       break;
     default:
       stringstream error_msg;
@@ -551,12 +534,12 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* 
state,
   return CreateNewTmpFile(state, output_partition);
 }
 
-void HdfsTableSink::GetHashTblKey(
-    const TupleRow* row, const vector<ExprContext*>& ctxs, string* key) {
+void HdfsTableSink::GetHashTblKey(const TupleRow* row,
+    const vector<ScalarExprEvaluator*>& evals, string* key) {
   stringstream hash_table_key;
-  for (int i = 0; i < ctxs.size(); ++i) {
+  for (int i = 0; i < evals.size(); ++i) {
     RawValue::PrintValueAsBytes(
-        ctxs[i]->GetValue(row), ctxs[i]->root()->type(), &hash_table_key);
+        evals[i]->GetValue(row), evals[i]->root().type(), &hash_table_key);
     // Additionally append "/" to avoid accidental key collisions.
     hash_table_key << "/";
   }
@@ -616,14 +599,14 @@ inline Status 
HdfsTableSink::GetOutputPartition(RuntimeState* state, const Tuple
 
 Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
-  ExprContext::FreeLocalAllocations(output_expr_ctxs_);
-  ExprContext::FreeLocalAllocations(partition_key_expr_ctxs_);
+  ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_);
+  ScalarExprEvaluator::FreeLocalAllocations(partition_key_expr_evals_);
   RETURN_IF_ERROR(state->CheckQueryState());
   // We don't do any work for an empty batch.
   if (batch->num_rows() == 0) return Status::OK();
 
   // If there are no partition keys then just pass the whole batch to one 
partition.
-  if (dynamic_partition_key_expr_ctxs_.empty()) {
+  if (dynamic_partition_key_expr_evals_.empty()) {
     // If there are no dynamic keys just use an empty key.
     PartitionPair* partition_pair;
     RETURN_IF_ERROR(
@@ -636,7 +619,7 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
       const TupleRow* current_row = batch->GetRow(i);
 
       string key;
-      GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, &key);
+      GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &key);
       PartitionPair* partition_pair = nullptr;
       RETURN_IF_ERROR(
           GetOutputPartition(state, current_row, key, &partition_pair, false));
@@ -695,7 +678,7 @@ Status HdfsTableSink::FlushFinal(RuntimeState* state) {
   DCHECK(!closed_);
   SCOPED_TIMER(profile()->total_time_counter());
 
-  if (dynamic_partition_key_expr_ctxs_.empty()) {
+  if (dynamic_partition_key_expr_evals_.empty()) {
     // Make sure we create an output partition even if the input is empty 
because we need
     // it to delete the existing data for 'insert overwrite'.
     PartitionPair* dummy;
@@ -727,9 +710,8 @@ void HdfsTableSink::Close(RuntimeState* state) {
     if (!close_status.ok()) state->LogError(close_status.msg());
   }
   partition_keys_to_output_partitions_.clear();
-
-  Expr::Close(output_expr_ctxs_, state);
-  Expr::Close(partition_key_expr_ctxs_, state);
+  ScalarExprEvaluator::Close(partition_key_expr_evals_, state);
+  ScalarExpr::Close(partition_key_exprs_);
   DataSink::Close(state);
   closed_ = true;
 }
@@ -743,8 +725,9 @@ string HdfsTableSink::DebugString() const {
   stringstream out;
   out << "HdfsTableSink(overwrite=" << (overwrite_ ? "true" : "false")
       << " table_desc=" << table_desc_->DebugString()
-      << " partition_key_exprs=" << Expr::DebugString(partition_key_expr_ctxs_)
-      << " output_exprs=" << Expr::DebugString(output_expr_ctxs_)
+      << " partition_key_exprs="
+      << ScalarExpr::DebugString(partition_key_exprs_)
+      << " output_exprs=" << ScalarExpr::DebugString(output_exprs_)
       << ")";
   return out.str();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index b928a20..28581f9 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -127,8 +127,7 @@ struct OutputPartition {
 /// This is consistent with Hive's behavior.
 class HdfsTableSink : public DataSink {
  public:
-  HdfsTableSink(const RowDescriptor& row_desc,
-      const std::vector<TExpr>& select_list_texprs, const TDataSink& tsink);
+  HdfsTableSink(const RowDescriptor& row_desc, const TDataSink& tsink);
 
   virtual std::string GetName() { return "HdfsTableSink"; }
 
@@ -163,6 +162,10 @@ class HdfsTableSink : public DataSink {
 
   std::string DebugString() const;
 
+ protected:
+  virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
+      const TDataSink& tsink, RuntimeState* state) WARN_UNUSED_RESULT;
+
  private:
   /// Initialises the filenames of a given output partition, and opens the 
temporary file.
   /// The partition key is derived from 'row'. If the partition will not have 
any rows
@@ -193,8 +196,8 @@ class HdfsTableSink : public DataSink {
   /// Generates string key for hash_tbl_ as a concatenation of all evaluated 
exprs,
   /// evaluated against 'row'. The generated string is much shorter than the 
full Hdfs
   /// file name.
-  void GetHashTblKey(
-      const TupleRow* row, const std::vector<ExprContext*>& ctxs, std::string* 
key);
+  void GetHashTblKey(const TupleRow* row,
+      const std::vector<ScalarExprEvaluator*>& evals, std::string* key);
 
   /// Given a hashed partition key, get the output partition structure from
   /// the 'partition_keys_to_output_partitions_'. 'no_more_rows' indicates 
that no more
@@ -203,9 +206,6 @@ class HdfsTableSink : public DataSink {
       const std::string& key, PartitionPair** partition_pair, bool 
no_more_rows)
       WARN_UNUSED_RESULT;
 
-  /// Initialise and prepare select and partition key expressions
-  Status PrepareExprs(RuntimeState* state) WARN_UNUSED_RESULT;
-
   /// Sets hdfs_file_name and tmp_hdfs_file_name of given output partition.
   /// The Hdfs directory is created from the target table's base Hdfs dir,
   /// the partition_key_names_ and the evaluated partition_key_exprs_.
@@ -246,9 +246,6 @@ class HdfsTableSink : public DataSink {
   /// Currently this is the default partition since we don't support 
multi-format sinks.
   const HdfsPartitionDescriptor* default_partition_;
 
-  /// Exprs that materialize output values
-  std::vector<ExprContext*> output_expr_ctxs_;
-
   /// Table id resolved in Prepare() to set tuple_desc_;
   TableId table_id_;
 
@@ -257,17 +254,6 @@ class HdfsTableSink : public DataSink {
   /// scanners while reading from the files.
   int skip_header_line_count_;
 
-  /// Thrift representation of select list exprs, saved in the constructor
-  /// to be used to initialise output_exprs_ in Init
-  const std::vector<TExpr>& select_list_texprs_;
-
-  /// Thrift representation of partition keys, saved in the constructor
-  /// to be used to initialise partition_key_exprs_ in Init
-  const std::vector<TExpr>& partition_key_texprs_;
-
-  /// Exprs of partition keys.
-  std::vector<ExprContext*> partition_key_expr_ctxs_;
-
   /// Indicates whether the existing partitions should be overwritten.
   bool overwrite_;
 
@@ -303,9 +289,13 @@ class HdfsTableSink : public DataSink {
   /// OutputPartition in the map to simplify the code.
   PartitionMap partition_keys_to_output_partitions_;
 
-  /// Subset of partition_key_expr_ctxs_ which are not constant. Set in 
Prepare().
+  /// Expressions for computing the target partitions to which a row is 
written.
+  std::vector<ScalarExpr*> partition_key_exprs_;
+  std::vector<ScalarExprEvaluator*> partition_key_expr_evals_;
+
+  /// Subset of partition_key_expr_evals_ which are not constant. Set in 
Prepare().
   /// Used for generating the string key of hash_tbl_.
-  std::vector<ExprContext*> dynamic_partition_key_expr_ctxs_;
+  std::vector<ScalarExprEvaluator*> dynamic_partition_key_expr_evals_;
 
   /// Map from row key (i.e. concatenated non-constant partition keys) to
   /// partition descriptor. We don't own the HdfsPartitionDescriptors, they

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.cc b/be/src/exec/hdfs-table-writer.cc
index b84915a..edc4be8 100644
--- a/be/src/exec/hdfs-table-writer.cc
+++ b/be/src/exec/hdfs-table-writer.cc
@@ -27,18 +27,17 @@
 namespace impala {
 
 HdfsTableWriter::HdfsTableWriter(HdfsTableSink* parent,
-                                 RuntimeState* state, OutputPartition* output,
-                                 const HdfsPartitionDescriptor* partition_desc,
-                                 const HdfsTableDescriptor* table_desc,
-                                 const vector<ExprContext*>& output_expr_ctxs)
+    RuntimeState* state, OutputPartition* output,
+    const HdfsPartitionDescriptor* partition_desc, const HdfsTableDescriptor* 
table_desc)
   : parent_(parent),
     state_(state),
     output_(output),
     table_desc_(table_desc),
-    output_expr_ctxs_(output_expr_ctxs) {
+    output_expr_evals_(parent->output_expr_evals()) {
   int num_non_partition_cols =
       table_desc_->num_cols() - table_desc_->num_clustering_cols();
-  DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << 
parent_->DebugString();
+  DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols)
+      << parent_->DebugString();
 }
 
 Status HdfsTableWriter::Write(const uint8_t* data, int32_t len) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index cad304f..cc08b00 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -27,13 +27,13 @@
 
 namespace impala {
 
-class RuntimeState;
-class OutputPartition;
-class ExprContext;
-class RowBatch;
 class HdfsPartitionDescriptor;
 class HdfsTableDescriptor;
 class HdfsTableSink;
+class OutputPartition;
+class RowBatch;
+class RuntimeState;
+class ScalarExprEvaluator;
 
 /// Pure virtual class for writing to hdfs table partition files.
 /// Subclasses implement the code needed to write to a specific file type.
@@ -46,12 +46,10 @@ class HdfsTableWriter {
   /// output_partition -- Information on the output partition file.
   /// partition -- the descriptor for the partition being written
   /// table_desc -- the descriptor for the table being written.
-  /// output_exprs -- expressions which generate the output values.
   HdfsTableWriter(HdfsTableSink* parent,
                   RuntimeState* state, OutputPartition* output_partition,
                   const HdfsPartitionDescriptor* partition_desc,
-                  const HdfsTableDescriptor* table_desc,
-                  const std::vector<ExprContext*>& output_expr_ctxs);
+                  const HdfsTableDescriptor* table_desc);
 
   virtual ~HdfsTableWriter() { }
 
@@ -129,8 +127,9 @@ class HdfsTableWriter {
   /// Table descriptor of table to be written.
   const HdfsTableDescriptor* table_desc_;
 
-  /// Expressions that materialize output values.
-  std::vector<ExprContext*> output_expr_ctxs_;
+  /// Reference to the evaluators of expressions which generate the output 
value.
+  /// The evaluators are owned by sink which owns this table writer.
+  const std::vector<ScalarExprEvaluator*>& output_expr_evals_;
 
   /// Subclass should populate any file format specific stats.
   TInsertStats stats_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 0a66460..c1a12d6 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -308,7 +308,7 @@ Status HdfsTextScanner::FinishScanRange(RowBatch* 
row_batch) {
         char* col = boundary_column_.buffer();
         int num_fields = 0;
         
RETURN_IF_ERROR(delimited_text_parser_->FillColumns<true>(boundary_column_.len(),
-            &col, &num_fields, &field_locations_[0]));
+            &col, &num_fields, field_locations_.data()));
 
         TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow());
         int max_tuples = row_batch->capacity() - row_batch->num_rows();
@@ -376,8 +376,8 @@ Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, 
int* num_tuples) {
       SCOPED_TIMER(parse_delimiter_timer_);
       RETURN_IF_ERROR(delimited_text_parser_->ParseFieldLocations(max_tuples,
           byte_buffer_end_ - byte_buffer_ptr_, &byte_buffer_ptr_,
-          &row_end_locations_[0],
-          &field_locations_[0], num_tuples, &num_fields, &col_start));
+          row_end_locations_.data(), field_locations_.data(), num_tuples,
+          &num_fields, &col_start));
     }
 
     // Materialize the tuples into the in memory format for this query
@@ -387,7 +387,7 @@ Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, 
int* num_tuples) {
       // There can be one partial tuple which returned no more fields from 
this buffer.
       DCHECK_LE(*num_tuples, num_fields + 1);
       if (!boundary_column_.IsEmpty()) {
-        RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool));
+        RETURN_IF_ERROR(CopyBoundaryField(field_locations_.data(), pool));
         boundary_column_.Clear();
       }
       num_tuples_materialized = WriteFields(num_fields, *num_tuples, pool, 
tuple_row_mem);
@@ -742,13 +742,13 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* 
split_delimiter) {
 // codegen'd using the IRBuilder for the specific tuple description.  This 
function
 // is then injected into the cross-compiled driving function, 
WriteAlignedTuples().
 Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node,
-    const vector<ExprContext*>& conjunct_ctxs, Function** 
write_aligned_tuples_fn) {
+    const vector<ScalarExpr*>& conjuncts, Function** write_aligned_tuples_fn) {
   *write_aligned_tuples_fn = nullptr;
   DCHECK(node->runtime_state()->ShouldCodegen());
   LlvmCodeGen* codegen = node->runtime_state()->codegen();
   DCHECK(codegen != nullptr);
   Function* write_complete_tuple_fn;
-  RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs,
+  RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjuncts,
       &write_complete_tuple_fn));
   DCHECK(write_complete_tuple_fn != nullptr);
   RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, 
write_complete_tuple_fn,
@@ -786,7 +786,7 @@ int HdfsTextScanner::WriteFields(int num_fields, int 
num_tuples, MemPool* pool,
     TupleRow* row) {
   SCOPED_TIMER(scan_node_->materialize_tuple_timer());
 
-  FieldLocation* fields = &field_locations_[0];
+  FieldLocation* fields = field_locations_.data();
 
   int num_tuples_processed = 0;
   int num_tuples_materialized = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index 042cd18..af8fa8a 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -62,7 +62,7 @@ class HdfsTextScanner : public HdfsScanner {
   /// Codegen WriteAlignedTuples(). Stores the resulting function in
   /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise.
   static Status Codegen(HdfsScanNodeBase* node,
-      const std::vector<ExprContext*>& conjunct_ctxs,
+      const std::vector<ScalarExpr*>& conjuncts,
       llvm::Function** write_aligned_tuples_fn);
 
   /// Suffix for lzo index files.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-text-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-table-writer.cc 
b/be/src/exec/hdfs-text-table-writer.cc
index cba4032..82db972 100644
--- a/be/src/exec/hdfs-text-table-writer.cc
+++ b/be/src/exec/hdfs-text-table-writer.cc
@@ -17,8 +17,8 @@
 
 #include "exec/hdfs-text-table-writer.h"
 #include "exec/exec-node.h"
-#include "exprs/expr.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
@@ -45,12 +45,10 @@ static const int64_t COMPRESSED_BUFFERED_SIZE = 60 * 1024 * 
1024;
 namespace impala {
 
 HdfsTextTableWriter::HdfsTextTableWriter(HdfsTableSink* parent,
-                                         RuntimeState* state, OutputPartition* 
output,
-                                         const HdfsPartitionDescriptor* 
partition,
-                                         const HdfsTableDescriptor* table_desc,
-                                         const vector<ExprContext*>& 
output_expr_ctxs)
-    : HdfsTableWriter(
-        parent, state, output, partition, table_desc, output_expr_ctxs) {
+    RuntimeState* state, OutputPartition* output,
+    const HdfsPartitionDescriptor* partition,
+    const HdfsTableDescriptor* table_desc)
+  : HdfsTableWriter(parent, state, output, partition, table_desc) {
   tuple_delim_ = partition->line_delim();
   field_delim_ = partition->field_delim();
   escape_char_ = partition->escape_char();
@@ -112,7 +110,7 @@ Status HdfsTextTableWriter::AppendRows(
   bool all_rows = row_group_indices.empty();
   int num_non_partition_cols =
       table_desc_->num_cols() - table_desc_->num_clustering_cols();
-  DCHECK_GE(output_expr_ctxs_.size(), num_non_partition_cols) << 
parent_->DebugString();
+  DCHECK_GE(output_expr_evals_.size(), num_non_partition_cols) << 
parent_->DebugString();
 
   {
     SCOPED_TIMER(parent_->encode_timer());
@@ -126,9 +124,9 @@ Status HdfsTextTableWriter::AppendRows(
       // partition col exprs are the last in output exprs, it's ok to just 
write
       // the first num_non_partition_cols values.
       for (int j = 0; j < num_non_partition_cols; ++j) {
-        void* value = output_expr_ctxs_[j]->GetValue(current_row);
+        void* value = output_expr_evals_[j]->GetValue(current_row);
         if (value != NULL) {
-          const ColumnType& type = output_expr_ctxs_[j]->root()->type();
+          const ColumnType& type = output_expr_evals_[j]->root().type();
           if (type.type == TYPE_CHAR) {
             char* val_ptr = StringValue::CharSlotToPtr(value, type);
             StringValue sv(val_ptr, StringValue::UnpaddedCharLength(val_ptr, 
type.len));
@@ -136,7 +134,7 @@ Status HdfsTextTableWriter::AppendRows(
           } else if (type.IsVarLenStringType()) {
             PrintEscaped(reinterpret_cast<const StringValue*>(value));
           } else {
-            output_expr_ctxs_[j]->PrintValue(value, &rowbatch_stringstream_);
+            output_expr_evals_[j]->PrintValue(value, &rowbatch_stringstream_);
           }
         } else {
           // NULLs in hive are encoded based on the 
'serialization.null.format' property.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/hdfs-text-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-table-writer.h 
b/be/src/exec/hdfs-text-table-writer.h
index 2944f23..589ed23 100644
--- a/be/src/exec/hdfs-text-table-writer.h
+++ b/be/src/exec/hdfs-text-table-writer.h
@@ -43,10 +43,9 @@ class TupleRow;
 class HdfsTextTableWriter : public HdfsTableWriter {
  public:
   HdfsTextTableWriter(HdfsTableSink* parent,
-                      RuntimeState* state, OutputPartition* output,
-                      const HdfsPartitionDescriptor* partition,
-                      const HdfsTableDescriptor* table_desc,
-                      const std::vector<ExprContext*>& output_expr_ctxs);
+      RuntimeState* state, OutputPartition* output,
+      const HdfsPartitionDescriptor* partition,
+      const HdfsTableDescriptor* table_desc);
 
   ~HdfsTextTableWriter() { }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.cc 
b/be/src/exec/kudu-scan-node-base.cc
index c8a7870..d587660 100644
--- a/be/src/exec/kudu-scan-node-base.cc
+++ b/be/src/exec/kudu-scan-node-base.cc
@@ -137,8 +137,4 @@ void KuduScanNodeBase::StopAndFinalizeCounters() {
   
PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_);
 }
 
-Status KuduScanNodeBase::GetConjunctCtxs(vector<ExprContext*>* ctxs) {
-  return Expr::CloneIfNotExists(conjunct_ctxs_, runtime_state_, ctxs);
-}
-
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-base.h 
b/be/src/exec/kudu-scan-node-base.h
index 70e5b94..49af13c 100644
--- a/be/src/exec/kudu-scan-node-base.h
+++ b/be/src/exec/kudu-scan-node-base.h
@@ -98,10 +98,6 @@ class KuduScanNodeBase : public ScanNode {
   static const std::string KUDU_ROUND_TRIPS;
   static const std::string KUDU_REMOTE_TOKENS;
 
-  /// Returns a cloned copy of the scan node's conjuncts. Requires that the 
expressions
-  /// have been open previously.
-  Status GetConjunctCtxs(vector<ExprContext*>* ctxs);
-
   const TupleDescriptor* tuple_desc() const { return tuple_desc_; }
   kudu::client::KuduClient* kudu_client() { return client_; }
   RuntimeProfile::Counter* kudu_round_trips() const { return 
kudu_round_trips_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index beead44..08a3339 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -21,6 +21,7 @@
 
 #include "exec/kudu-scanner.h"
 #include "exec/kudu-util.h"
+#include "exprs/scalar-expr.h"
 #include "gutil/gscoped_ptr.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/mem-pool.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 3cae4af..3fb0a18 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -22,9 +22,9 @@
 #include <vector>
 #include <string>
 
-#include "exprs/expr.h"
-#include "exprs/expr-context.h"
 #include "exec/kudu-util.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.h"
@@ -63,6 +63,7 @@ const string MODE_READ_AT_SNAPSHOT = "READ_AT_SNAPSHOT";
 KuduScanner::KuduScanner(KuduScanNodeBase* scan_node, RuntimeState* state)
   : scan_node_(scan_node),
     state_(state),
+    expr_mem_pool_(new MemPool(scan_node->expr_mem_tracker())),
     cur_kudu_batch_num_read_(0),
     last_alive_time_micros_(0) {
 }
@@ -73,7 +74,8 @@ Status KuduScanner::Open() {
     if (slot->type().type != TYPE_TIMESTAMP) continue;
     timestamp_slots_.push_back(slot);
   }
-  return scan_node_->GetConjunctCtxs(&conjunct_ctxs_);
+  return ScalarExprEvaluator::Clone(&obj_pool_, state_, expr_mem_pool_.get(),
+      scan_node_->conjunct_evals(), &conjunct_evals_);
 }
 
 void KuduScanner::KeepKuduScannerAlive() {
@@ -128,7 +130,8 @@ Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) 
{
 
 void KuduScanner::Close() {
   if (scanner_) CloseCurrentClientScanner();
-  Expr::Close(conjunct_ctxs_, state_);
+  ScalarExprEvaluator::Close(conjunct_evals_, state_);
+  expr_mem_pool_->FreeAll();
 }
 
 Status KuduScanner::OpenNextScanToken(const string& scan_token)  {
@@ -185,7 +188,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* 
row_batch, Tuple** tuple_me
 
   // Iterate through the Kudu rows, evaluate conjuncts and deep-copy survivors 
into
   // 'row_batch'.
-  bool has_conjuncts = !conjunct_ctxs_.empty();
+  bool has_conjuncts = !conjunct_evals_.empty();
   int num_rows = cur_kudu_batch_.NumRows();
 
   for (int krow_idx = cur_kudu_batch_num_read_; krow_idx < num_rows; 
++krow_idx) {
@@ -223,8 +226,8 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* 
row_batch, Tuple** tuple_me
     // Evaluate the conjuncts that haven't been pushed down to Kudu. Conjunct 
evaluation
     // is performed directly on the Kudu tuple because its memory layout is 
identical to
     // Impala's. We only copy the surviving tuples to Impala's output row 
batch.
-    if (has_conjuncts && !ExecNode::EvalConjuncts(&conjunct_ctxs_[0],
-        conjunct_ctxs_.size(), reinterpret_cast<TupleRow*>(&kudu_tuple))) {
+    if (has_conjuncts && !ExecNode::EvalConjuncts(conjunct_evals_.data(),
+            conjunct_evals_.size(), reinterpret_cast<TupleRow*>(&kudu_tuple))) 
{
       continue;
     }
     // Deep copy the tuple, set it in a new row, and commit the row.
@@ -238,7 +241,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* 
row_batch, Tuple** tuple_me
     // Move to the next tuple in the tuple buffer.
     *tuple_mem = next_tuple(*tuple_mem);
   }
-  ExprContext::FreeLocalAllocations(conjunct_ctxs_);
+  ScalarExprEvaluator::FreeLocalAllocations(conjunct_evals_);
 
   // Check the status in case an error status was set during conjunct 
evaluation.
   return state_->GetQueryStatus();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index 7a6ca76..e327e7a 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -21,6 +21,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <kudu/client/client.h>
 
+#include "common/object-pool.h"
 #include "exec/kudu-scan-node-base.h"
 #include "runtime/descriptors.h"
 
@@ -86,6 +87,13 @@ class KuduScanner {
   KuduScanNodeBase* scan_node_;
   RuntimeState* state_;
 
+  /// For objects which have the same life time as the scanner.
+  ObjectPool obj_pool_;
+
+  /// MemPool used for expression evaluators in this scanner. Need to be local
+  /// to each scanner as MemPool is not thread safe.
+  boost::scoped_ptr<MemPool> expr_mem_pool_;
+
   /// The kudu::client::KuduScanner for the current scan token. A new 
KuduScanner is
   /// created for each scan token using 
KuduScanToken::DeserializeIntoScanner().
   boost::scoped_ptr<kudu::client::KuduScanner> scanner_;
@@ -100,7 +108,7 @@ class KuduScanner {
   int64_t last_alive_time_micros_;
 
   /// The scanner's cloned copy of the conjuncts to apply.
-  vector<ExprContext*> conjunct_ctxs_;
+  vector<ScalarExprEvaluator*> conjunct_evals_;
 
   /// Timestamp slots in the tuple descriptor of the scan node. Used to 
convert Kudu
   /// UNIXTIME_MICRO values inline.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index b5ffc27..32aab79 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -21,8 +21,8 @@
 #include <thrift/protocol/TDebugProtocol.h>
 
 #include "exec/kudu-util.h"
-#include "exprs/expr.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gutil/gscoped_ptr.h"
 #include "runtime/exec-env.h"
@@ -71,31 +71,18 @@ const static string& ROOT_PARTITION_KEY =
 // Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
 const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
 
-KuduTableSink::KuduTableSink(const RowDescriptor& row_desc,
-    const vector<TExpr>& select_list_texprs,
-    const TDataSink& tsink)
+KuduTableSink::KuduTableSink(const RowDescriptor& row_desc, const TDataSink& 
tsink)
     : DataSink(row_desc),
       table_id_(tsink.table_sink.target_table_id),
-      select_list_texprs_(select_list_texprs),
       sink_action_(tsink.table_sink.action),
       kudu_table_sink_(tsink.table_sink.kudu_table_sink) {
+  DCHECK(tsink.__isset.table_sink);
   DCHECK(KuduIsAvailable());
 }
 
-Status KuduTableSink::PrepareExprs(RuntimeState* state) {
-  // From the thrift expressions create the real exprs.
-  RETURN_IF_ERROR(Expr::CreateExprTrees(state->obj_pool(), select_list_texprs_,
-                                        &output_expr_ctxs_));
-  // Prepare the exprs to run.
-  RETURN_IF_ERROR(
-      Expr::Prepare(output_expr_ctxs_, state, row_desc_, 
expr_mem_tracker_.get()));
-  return Status::OK();
-}
-
 Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   SCOPED_TIMER(profile()->total_time_counter());
-  RETURN_IF_ERROR(PrepareExprs(state));
 
   // Get the kudu table descriptor.
   TableDescriptor* table_desc = 
state->desc_tbl().GetTableDescriptor(table_id_);
@@ -130,7 +117,7 @@ Status KuduTableSink::Prepare(RuntimeState* state, 
MemTracker* parent_mem_tracke
 }
 
 Status KuduTableSink::Open(RuntimeState* state) {
-  RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, state));
+  RETURN_IF_ERROR(DataSink::Open(state));
 
   int64_t required_mem = FLAGS_kudu_sink_mem_required;
   if (!mem_tracker_->TryConsume(required_mem)) {
@@ -206,7 +193,7 @@ kudu::client::KuduWriteOperation* 
KuduTableSink::NewWriteOp() {
 
 Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
-  ExprContext::FreeLocalAllocations(output_expr_ctxs_);
+  ScalarExprEvaluator::FreeLocalAllocations(output_expr_evals_);
   RETURN_IF_ERROR(state->CheckQueryState());
   const KuduSchema& table_schema = table_->schema();
 
@@ -224,14 +211,14 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
     unique_ptr<kudu::client::KuduWriteOperation> write(NewWriteOp());
     bool add_row = true;
 
-    for (int j = 0; j < output_expr_ctxs_.size(); ++j) {
-      // output_expr_ctxs_ only contains the columns that the op
+    for (int j = 0; j < output_expr_evals_.size(); ++j) {
+      // output_expr_evals_ only contains the columns that the op
       // applies to, i.e. columns explicitly mentioned in the query, and
       // referenced_columns is then used to map to actual column positions.
       int col = kudu_table_sink_.referenced_columns.empty() ?
           j : kudu_table_sink_.referenced_columns[j];
 
-      void* value = output_expr_ctxs_[j]->GetValue(current_row);
+      void* value = output_expr_evals_[j]->GetValue(current_row);
       if (value == nullptr) {
         if (table_schema.Column(col).is_nullable()) {
           KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col),
@@ -249,7 +236,7 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* 
batch) {
         }
       }
 
-      PrimitiveType type = output_expr_ctxs_[j]->root()->type().type;
+      PrimitiveType type = output_expr_evals_[j]->root().type().type;
       WriteKuduValue(col, type, value, true, write->mutable_row());
     }
     if (add_row) write_ops.push_back(move(write));
@@ -339,7 +326,6 @@ void KuduTableSink::Close(RuntimeState* state) {
     client_ = nullptr;
   }
   SCOPED_TIMER(profile()->total_time_counter());
-  Expr::Close(output_expr_ctxs_, state);
   DataSink::Close(state);
   closed_ = true;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/kudu-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index b2239fb..dcf5241 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -25,8 +25,8 @@
 #include "common/status.h"
 #include "exec/kudu-util.h"
 #include "exec/data-sink.h"
-#include "exprs/expr-context.h"
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 
 namespace impala {
 
@@ -53,8 +53,7 @@ namespace impala {
 /// status. All reported errors (ignored or not) will be logged via the 
RuntimeState.
 class KuduTableSink : public DataSink {
  public:
-  KuduTableSink(const RowDescriptor& row_desc,
-      const std::vector<TExpr>& select_list_texprs, const TDataSink& tsink);
+  KuduTableSink(const RowDescriptor& row_desc, const TDataSink& tsink);
 
   virtual std::string GetName() { return "KuduTableSink"; }
 
@@ -76,9 +75,6 @@ class KuduTableSink : public DataSink {
   virtual void Close(RuntimeState* state);
 
  private:
-  /// Turn thrift TExpr into Expr and prepare them to run
-  Status PrepareExprs(RuntimeState* state);
-
   /// Create a new write operation according to the sink type.
   kudu::client::KuduWriteOperation* NewWriteOp();
 
@@ -94,11 +90,6 @@ class KuduTableSink : public DataSink {
   /// The descriptor of the KuduTable being written to. Set on Prepare().
   const KuduTableDescriptor* table_desc_;
 
-  /// The expression descriptors and the prepared expressions. The latter are 
built
-  /// on Prepare().
-  const std::vector<TExpr>& select_list_texprs_;
-  std::vector<ExprContext*> output_expr_ctxs_;
-
   /// The Kudu client, owned by the ExecEnv.
   kudu::client::KuduClient* client_ = nullptr;
   /// The Kudu table and session.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc 
b/be/src/exec/nested-loop-join-node.cc
index caa57b4..14d9ac8 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -20,7 +20,8 @@
 #include <sstream>
 #include <gutil/strings/substitute.h>
 
-#include "exprs/expr.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
@@ -50,18 +51,20 @@ NestedLoopJoinNode::~NestedLoopJoinNode() {
 Status NestedLoopJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) {
   RETURN_IF_ERROR(BlockingJoinNode::Init(tnode, state));
   DCHECK(tnode.__isset.nested_loop_join_node);
-  RETURN_IF_ERROR(Expr::CreateExprTrees(pool_, 
tnode.nested_loop_join_node.join_conjuncts,
-      &join_conjunct_ctxs_));
-
+  // join_conjunct_evals_ are evaluated in the context of rows assembled from
+  // all inner and outer tuples.
+  RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc());
+  
RETURN_IF_ERROR(ScalarExpr::Create(tnode.nested_loop_join_node.join_conjuncts,
+      full_row_desc, state, &join_conjuncts_));
   DCHECK(tnode.nested_loop_join_node.join_op != TJoinOp::CROSS_JOIN ||
-      join_conjunct_ctxs_.size() == 0) << "Join conjuncts in a cross join";
+      join_conjuncts_.size() == 0) << "Join conjuncts in a cross join";
   return Status::OK();
 }
 
 Status NestedLoopJoinNode::Open(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(BlockingJoinNode::Open(state));
-  RETURN_IF_ERROR(Expr::Open(join_conjunct_ctxs_, state));
+  RETURN_IF_ERROR(ScalarExprEvaluator::Open(join_conjunct_evals_, state));
 
   // Check for errors and free local allocations before opening children.
   RETURN_IF_CANCELLED(state);
@@ -95,13 +98,8 @@ Status NestedLoopJoinNode::Open(RuntimeState* state) {
 Status NestedLoopJoinNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
-
-  // join_conjunct_ctxs_ are evaluated in the context of rows assembled from
-  // all inner and outer tuples.
-  RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc());
-  RETURN_IF_ERROR(
-      Expr::Prepare(join_conjunct_ctxs_, state, full_row_desc, 
expr_mem_tracker()));
-
+  RETURN_IF_ERROR(ScalarExprEvaluator::Create(join_conjuncts_, state,
+      pool_, expr_mem_pool(), &join_conjunct_evals_));
   builder_.reset(new NljBuilder(child(1)->row_desc(), state));
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
   runtime_profile()->PrependChild(builder_->profile());
@@ -138,7 +136,8 @@ Status NestedLoopJoinNode::Reset(RuntimeState* state) {
 
 void NestedLoopJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  Expr::Close(join_conjunct_ctxs_, state);
+  ScalarExprEvaluator::Close(join_conjunct_evals_, state);
+  ScalarExpr::Close(join_conjuncts_);
   if (builder_ != NULL) {
     builder_->Close(state);
     builder_.reset();
@@ -291,8 +290,9 @@ Status 
NestedLoopJoinNode::GetNextLeftOuterJoin(RuntimeState* state,
 
 Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
     RowBatch* output_batch) {
-  ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
-  size_t num_join_ctxs = join_conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* join_conjunct_evals = 
join_conjunct_evals_.data();
+  size_t num_join_conjuncts = join_conjuncts_.size();
+  DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size());
   const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
   while (!eos_) {
@@ -309,7 +309,8 @@ Status 
NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
         RETURN_IF_CANCELLED(state);
         RETURN_IF_ERROR(QueryMaintenance(state));
       }
-      if (!EvalConjuncts(join_conjunct_ctxs, num_join_ctxs, 
semi_join_staging_row_)) {
+      if (!EvalConjuncts(
+              join_conjunct_evals, num_join_conjuncts, 
semi_join_staging_row_)) {
         continue;
       }
       // A match is found. Create the output row from the probe row.
@@ -336,8 +337,9 @@ Status 
NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
 
 Status NestedLoopJoinNode::GetNextLeftAntiJoin(RuntimeState* state,
     RowBatch* output_batch) {
-  ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
-  size_t num_join_ctxs = join_conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* join_conjunct_evals = 
join_conjunct_evals_.data();
+  size_t num_join_conjuncts = join_conjuncts_.size();
+  DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size());
   const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
   while (!eos_) {
@@ -354,7 +356,8 @@ Status 
NestedLoopJoinNode::GetNextLeftAntiJoin(RuntimeState* state,
         RETURN_IF_CANCELLED(state);
         RETURN_IF_ERROR(QueryMaintenance(state));
       }
-      if (EvalConjuncts(join_conjunct_ctxs, num_join_ctxs, 
semi_join_staging_row_)) {
+      if (EvalConjuncts(
+              join_conjunct_evals, num_join_conjuncts, 
semi_join_staging_row_)) {
         // Found a match for the probe row. This row will not be in the result.
         matched_probe_ = true;
         break;
@@ -388,8 +391,9 @@ Status 
NestedLoopJoinNode::GetNextRightOuterJoin(RuntimeState* state,
 
 Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
     RowBatch* output_batch) {
-  ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
-  size_t num_join_ctxs = join_conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* join_conjunct_evals = 
join_conjunct_evals_.data();
+  size_t num_join_conjuncts = join_conjuncts_.size();
+  DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size());
   DCHECK(matching_build_rows_ != NULL);
   const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
@@ -397,8 +401,8 @@ Status 
NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
     DCHECK(HasValidProbeRow());
     while (!build_row_iterator_.AtEnd()) {
       DCHECK(HasValidProbeRow());
-      // This loop can go on for a long time if the conjuncts are very 
selective. Do
-      // query maintenance every N iterations.
+      // This loop can go on for a long time if the conjuncts are very 
selective.
+      // Do query maintenance every N iterations.
       if ((current_build_row_idx_ & (N - 1)) == 0) {
         if (ReachedLimit()) {
           eos_ = true;
@@ -418,7 +422,8 @@ Status 
NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
       CreateOutputRow(semi_join_staging_row_, current_probe_row_,
           build_row_iterator_.GetRow());
       // Evaluate the join conjuncts on the semi-join staging row.
-      if (!EvalConjuncts(join_conjunct_ctxs, num_join_ctxs, 
semi_join_staging_row_)) {
+      if (!EvalConjuncts(
+              join_conjunct_evals, num_join_conjuncts, 
semi_join_staging_row_)) {
         build_row_iterator_.Next();
         ++current_build_row_idx_;
         continue;
@@ -445,8 +450,9 @@ Status 
NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
 
 Status NestedLoopJoinNode::GetNextRightAntiJoin(RuntimeState* state,
     RowBatch* output_batch) {
-  ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
-  size_t num_join_ctxs = join_conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* join_conjunct_evals = 
join_conjunct_evals_.data();
+  size_t num_join_conjuncts = join_conjuncts_.size();
+  DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size());
   DCHECK(matching_build_rows_ != NULL);
   const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
 
@@ -468,7 +474,8 @@ Status 
NestedLoopJoinNode::GetNextRightAntiJoin(RuntimeState* state,
       }
       CreateOutputRow(semi_join_staging_row_, current_probe_row_,
           build_row_iterator_.GetRow());
-      if (EvalConjuncts(join_conjunct_ctxs, num_join_ctxs, 
semi_join_staging_row_)) {
+      if (EvalConjuncts(
+              join_conjunct_evals, num_join_conjuncts, 
semi_join_staging_row_)) {
         matching_build_rows_->Set(current_build_row_idx_, true);
       }
       build_row_iterator_.Next();
@@ -501,8 +508,9 @@ Status 
NestedLoopJoinNode::ProcessUnmatchedProbeRow(RuntimeState* state,
     RowBatch* output_batch) {
   DCHECK(!matched_probe_);
   DCHECK(current_probe_row_ != NULL);
-  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
-  size_t num_ctxs = conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
+  size_t num_conjuncts = conjuncts_.size();
+  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
   TupleRow* output_row = output_batch->GetRow(output_batch->AddRow());
   if (join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == 
TJoinOp::FULL_OUTER_JOIN) {
     CreateOutputRow(output_row, current_probe_row_, NULL);
@@ -512,7 +520,7 @@ Status 
NestedLoopJoinNode::ProcessUnmatchedProbeRow(RuntimeState* state,
     output_batch->CopyRow(current_probe_row_, output_row);
   }
   // Evaluate all the other (non-join) conjuncts.
-  if (EvalConjuncts(conjunct_ctxs, num_ctxs, output_row)) {
+  if (EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) {
     VLOG_ROW << "match row:" << PrintRow(output_row, row_desc());
     output_batch->CommitLastRow();
     ++num_rows_returned_;
@@ -530,8 +538,9 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
     current_build_row_idx_ = 0;
     process_unmatched_build_rows_ = true;
   }
-  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
-  size_t num_ctxs = conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
+  size_t num_conjuncts = conjuncts_.size();
+  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
   DCHECK(matching_build_rows_ != NULL);
 
   const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
@@ -566,7 +575,7 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
     ++current_build_row_idx_;
     // Evaluate conjuncts that don't affect the matching rows of the join on 
the
     // result row.
-    if (EvalConjuncts(conjunct_ctxs, num_ctxs, output_row)) {
+    if (EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) {
       VLOG_ROW << "match row: " << PrintRow(output_row, row_desc());
       output_batch->CommitLastRow();
       ++num_rows_returned_;
@@ -584,10 +593,12 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
 Status NestedLoopJoinNode::FindBuildMatches(
     RuntimeState* state, RowBatch* output_batch, bool* return_output_batch) {
   *return_output_batch = false;
-  ExprContext* const* join_conjunct_ctxs = &join_conjunct_ctxs_[0];
-  size_t num_join_ctxs = join_conjunct_ctxs_.size();
-  ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
-  size_t num_ctxs = conjunct_ctxs_.size();
+  ScalarExprEvaluator* const* join_conjunct_evals = 
join_conjunct_evals_.data();
+  size_t num_join_conjuncts = join_conjuncts_.size();
+  DCHECK_EQ(num_join_conjuncts, join_conjunct_evals_.size());
+  ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
+  size_t num_conjuncts = conjuncts_.size();
+  DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
 
   const int N = BitUtil::RoundUpToPowerOfTwo(state->batch_size());
   while (!build_row_iterator_.AtEnd()) {
@@ -609,12 +620,14 @@ Status NestedLoopJoinNode::FindBuildMatches(
       RETURN_IF_CANCELLED(state);
       RETURN_IF_ERROR(QueryMaintenance(state));
     }
-    if (!EvalConjuncts(join_conjunct_ctxs, num_join_ctxs, output_row)) 
continue;
+    if (!EvalConjuncts(join_conjunct_evals, num_join_conjuncts, output_row)) {
+      continue;
+    }
     matched_probe_ = true;
     if (matching_build_rows_ != NULL) {
       matching_build_rows_->Set(current_build_row_idx_ - 1, true);
     }
-    if (!EvalConjuncts(conjunct_ctxs, num_ctxs, output_row)) continue;
+    if (!EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) continue;
     VLOG_ROW << "match row: " << PrintRow(output_row, row_desc());
     output_batch->CommitLastRow();
     ++num_rows_returned_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/nested-loop-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.h 
b/be/src/exec/nested-loop-join-node.h
index 1d3d2da..94f1dae 100644
--- a/be/src/exec/nested-loop-join-node.h
+++ b/be/src/exec/nested-loop-join-node.h
@@ -83,7 +83,8 @@ class NestedLoopJoinNode : public BlockingJoinNode {
   /////////////////////////////////////////
 
   /// Join conjuncts
-  std::vector<ExprContext*> join_conjunct_ctxs_;
+  std::vector<ScalarExpr*> join_conjuncts_;
+  std::vector<ScalarExprEvaluator*> join_conjunct_evals_;
 
   /// Optimized build for the case where the right child is a 
SingularRowSrcNode.
   Status ConstructSingularBuildSide(RuntimeState* state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/old-hash-table-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table-ir.cc b/be/src/exec/old-hash-table-ir.cc
index ef94892..2436ef1 100644
--- a/be/src/exec/old-hash-table-ir.cc
+++ b/be/src/exec/old-hash-table-ir.cc
@@ -29,12 +29,12 @@ uint8_t* OldHashTable::expr_value_null_bits() const {
   return expr_value_null_bits_;
 }
 
-ExprContext* const* OldHashTable::build_expr_ctxs() const {
-  return &build_expr_ctxs_[0];
+ScalarExprEvaluator* const* OldHashTable::build_expr_evals() const {
+  return build_expr_evals_.data();
 }
 
-ExprContext* const* OldHashTable::probe_expr_ctxs() const {
-  return &probe_expr_ctxs_[0];
+ScalarExprEvaluator* const* OldHashTable::probe_expr_evals() const {
+  return probe_expr_evals_.data();
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b38d9826/be/src/exec/old-hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table-test.cc 
b/be/src/exec/old-hash-table-test.cc
index 31e0cac..e873791 100644
--- a/be/src/exec/old-hash-table-test.cc
+++ b/be/src/exec/old-hash-table-test.cc
@@ -22,8 +22,8 @@
 
 #include "common/compiler-util.h"
 #include "exec/old-hash-table.inline.h"
-#include "exprs/expr.h"
-#include "exprs/expr-context.h"
+#include "exprs/scalar-expr.h"
+#include "exprs/scalar-expr-evaluator.h"
 #include "exprs/slot-ref.h"
 #include "runtime/mem-pool.h"
 #include "runtime/mem-tracker.h"
@@ -45,30 +45,37 @@ class OldHashTableTest : public testing::Test {
   ObjectPool pool_;
   MemTracker tracker_;
   MemPool mem_pool_;
-  vector<ExprContext*> build_expr_ctxs_;
-  vector<ExprContext*> probe_expr_ctxs_;
+
+  vector<ScalarExpr*> build_exprs_;
+  vector<ScalarExprEvaluator*> build_expr_evals_;
+  vector<ScalarExpr*> probe_exprs_;
+  vector<ScalarExprEvaluator*> probe_expr_evals_;
 
   virtual void SetUp() {
     RowDescriptor desc;
-    Status status;
-
     // Not very easy to test complex tuple layouts so this test will use the
     // simplest.  The purpose of these tests is to exercise the hash map
     // internals so a simple build/probe expr is fine.
-    Expr* expr = pool_.Add(new SlotRef(TYPE_INT, 0));
-    build_expr_ctxs_.push_back(pool_.Add(new ExprContext(expr)));
-    ASSERT_OK(Expr::Prepare(build_expr_ctxs_, NULL, desc, &tracker_));
-    ASSERT_OK(Expr::Open(build_expr_ctxs_, NULL));
-
-    expr = pool_.Add(new SlotRef(TYPE_INT, 0));
-    probe_expr_ctxs_.push_back(pool_.Add(new ExprContext(expr)));
-    ASSERT_OK(Expr::Prepare(probe_expr_ctxs_, NULL, desc, &tracker_));
-    ASSERT_OK(Expr::Open(probe_expr_ctxs_, NULL));
+    ScalarExpr* build_expr = pool_.Add(new SlotRef(TYPE_INT, 0));
+    ASSERT_OK(build_expr->Init(desc, nullptr));
+    build_exprs_.push_back(build_expr);
+    ASSERT_OK(ScalarExprEvaluator::Create(build_exprs_, nullptr, &pool_, 
&mem_pool_,
+        &build_expr_evals_));
+    ASSERT_OK(ScalarExprEvaluator::Open(build_expr_evals_, nullptr));
+
+    ScalarExpr* probe_expr = pool_.Add(new SlotRef(TYPE_INT, 0));
+    ASSERT_OK(probe_expr->Init(desc, nullptr));
+    probe_exprs_.push_back(probe_expr);
+    ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, 
&mem_pool_,
+        &probe_expr_evals_));
+    ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr));
   }
 
   virtual void TearDown() {
-    Expr::Close(build_expr_ctxs_, NULL);
-    Expr::Close(probe_expr_ctxs_, NULL);
+    ScalarExprEvaluator::Close(build_expr_evals_, nullptr);
+    ScalarExprEvaluator::Close(probe_expr_evals_, nullptr);
+    ScalarExpr::Close(build_exprs_);
+    ScalarExpr::Close(probe_exprs_);
   }
 
   TupleRow* CreateTupleRow(int32_t val) {
@@ -95,10 +102,10 @@ class OldHashTableTest : public testing::Test {
     OldHashTable::Iterator iter = table->Begin();
     while (iter != table->End()) {
       TupleRow* row = iter.GetRow();
-      int32_t val = 
*reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(row));
+      int32_t val = 
*reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(row));
       EXPECT_GE(val, min);
       EXPECT_LT(val, max);
-      if (all_unique) EXPECT_TRUE(results[val] == NULL);
+      if (all_unique) EXPECT_TRUE(results[val] == nullptr);
       EXPECT_EQ(row->GetTuple(0), expected[val]->GetTuple(0));
       results[val] = row;
       iter.Next<false>();
@@ -110,9 +117,9 @@ class OldHashTableTest : public testing::Test {
   void ValidateMatch(TupleRow* probe_row, TupleRow* build_row) {
     EXPECT_TRUE(probe_row != build_row);
     int32_t build_val =
-        *reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(probe_row));
+        *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(probe_row));
     int32_t probe_val =
-        *reinterpret_cast<int32_t*>(probe_expr_ctxs_[0]->GetValue(build_row));
+        *reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(build_row));
     EXPECT_EQ(build_val, probe_val);
   }
 
@@ -160,16 +167,16 @@ TEST_F(OldHashTableTest, SetupTest) {
   TupleRow* probe_row4 = CreateTupleRow(4);
 
   int32_t* val_row1 =
-      reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(build_row1));
+      reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row1));
   EXPECT_EQ(*val_row1, 1);
   int32_t* val_row2 =
-      reinterpret_cast<int32_t*>(build_expr_ctxs_[0]->GetValue(build_row2));
+      reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row2));
   EXPECT_EQ(*val_row2, 2);
   int32_t* val_row3 =
-      reinterpret_cast<int32_t*>(probe_expr_ctxs_[0]->GetValue(probe_row3));
+      reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row3));
   EXPECT_EQ(*val_row3, 3);
   int32_t* val_row4 =
-      reinterpret_cast<int32_t*>(probe_expr_ctxs_[0]->GetValue(probe_row4));
+      reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row4));
   EXPECT_EQ(*val_row4, 4);
 
   mem_pool_.FreeAll();
@@ -196,53 +203,56 @@ TEST_F(OldHashTableTest, BasicTest) {
 
   // Create the hash table and insert the build rows
   MemTracker tracker;
-  OldHashTable hash_table(NULL, build_expr_ctxs_, probe_expr_ctxs_,
-      vector<ExprContext*>(), 1, false, 
std::vector<bool>(build_expr_ctxs_.size(), false),
-      0, &tracker, vector<RuntimeFilter*>());
+  scoped_ptr<OldHashTable> hash_table;
+  EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_,
+      vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), 
false),
+      0, &tracker, vector<RuntimeFilter*>(), &hash_table));
+  EXPECT_OK(hash_table->Open(nullptr));
   for (int i = 0; i < 5; ++i) {
-    hash_table.Insert(build_rows[i]);
+    hash_table->Insert(build_rows[i]);
   }
-  EXPECT_EQ(hash_table.size(), 5);
+  EXPECT_EQ(hash_table->size(), 5);
 
   // Do a full table scan and validate returned pointers
-  FullScan(&hash_table, 0, 5, true, scan_rows, build_rows);
-  ProbeTest(&hash_table, probe_rows, 10, false);
+  FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
+  ProbeTest(hash_table.get(), probe_rows, 10, false);
 
   // Resize and scan again
-  ResizeTable(&hash_table, 64);
-  EXPECT_EQ(hash_table.num_buckets(), 64);
-  EXPECT_EQ(hash_table.size(), 5);
+  ResizeTable(hash_table.get(), 64);
+  EXPECT_EQ(hash_table->num_buckets(), 64);
+  EXPECT_EQ(hash_table->size(), 5);
   memset(scan_rows, 0, sizeof(scan_rows));
-  FullScan(&hash_table, 0, 5, true, scan_rows, build_rows);
-  ProbeTest(&hash_table, probe_rows, 10, false);
+  FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
+  ProbeTest(hash_table.get(), probe_rows, 10, false);
 
   // Resize to two and cause some collisions
-  ResizeTable(&hash_table, 2);
-  EXPECT_EQ(hash_table.num_buckets(), 2);
-  EXPECT_EQ(hash_table.size(), 5);
+  ResizeTable(hash_table.get(), 2);
+  EXPECT_EQ(hash_table->num_buckets(), 2);
+  EXPECT_EQ(hash_table->size(), 5);
   memset(scan_rows, 0, sizeof(scan_rows));
-  FullScan(&hash_table, 0, 5, true, scan_rows, build_rows);
-  ProbeTest(&hash_table, probe_rows, 10, false);
+  FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
+  ProbeTest(hash_table.get(), probe_rows, 10, false);
 
   // Resize to one and turn it into a linked list
-  ResizeTable(&hash_table, 1);
-  EXPECT_EQ(hash_table.num_buckets(), 1);
-  EXPECT_EQ(hash_table.size(), 5);
+  ResizeTable(hash_table.get(), 1);
+  EXPECT_EQ(hash_table->num_buckets(), 1);
+  EXPECT_EQ(hash_table->size(), 5);
   memset(scan_rows, 0, sizeof(scan_rows));
-  FullScan(&hash_table, 0, 5, true, scan_rows, build_rows);
-  ProbeTest(&hash_table, probe_rows, 10, false);
+  FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
+  ProbeTest(hash_table.get(), probe_rows, 10, false);
 
-  hash_table.Close();
+  hash_table->Close(nullptr);
   mem_pool_.FreeAll();
 }
 
 // This tests makes sure we can scan ranges of buckets
 TEST_F(OldHashTableTest, ScanTest) {
   MemTracker tracker;
-  OldHashTable hash_table(NULL, build_expr_ctxs_, probe_expr_ctxs_,
-      vector<ExprContext*>(), 1, false,
-      std::vector<bool>(build_expr_ctxs_.size(), false), 0, &tracker,
-      vector<RuntimeFilter*>());
+  scoped_ptr<OldHashTable> hash_table;
+  EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_,
+      vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), 
false),
+      0, &tracker, vector<RuntimeFilter*>(), &hash_table));
+  EXPECT_OK(hash_table->Open(nullptr));
   // Add 1 row with val 1, 2 with val 2, etc
   vector<TupleRow*> build_rows;
   ProbeTestData probe_rows[15];
@@ -251,7 +261,7 @@ TEST_F(OldHashTableTest, ScanTest) {
     probe_rows[val].probe_row = CreateTupleRow(val);
     for (int i = 0; i < val; ++i) {
       TupleRow* row = CreateTupleRow(val);
-      hash_table.Insert(row);
+      hash_table->Insert(row);
       build_rows.push_back(row);
       probe_rows[val].expected_build_rows.push_back(row);
     }
@@ -263,22 +273,22 @@ TEST_F(OldHashTableTest, ScanTest) {
   }
 
   // Test that all the builds were found
-  ProbeTest(&hash_table, probe_rows, 15, true);
+  ProbeTest(hash_table.get(), probe_rows, 15, true);
 
   // Resize and try again
-  ResizeTable(&hash_table, 128);
-  EXPECT_EQ(hash_table.num_buckets(), 128);
-  ProbeTest(&hash_table, probe_rows, 15, true);
+  ResizeTable(hash_table.get(), 128);
+  EXPECT_EQ(hash_table->num_buckets(), 128);
+  ProbeTest(hash_table.get(), probe_rows, 15, true);
 
-  ResizeTable(&hash_table, 16);
-  EXPECT_EQ(hash_table.num_buckets(), 16);
-  ProbeTest(&hash_table, probe_rows, 15, true);
+  ResizeTable(hash_table.get(), 16);
+  EXPECT_EQ(hash_table->num_buckets(), 16);
+  ProbeTest(hash_table.get(), probe_rows, 15, true);
 
-  ResizeTable(&hash_table, 2);
-  EXPECT_EQ(hash_table.num_buckets(), 2);
-  ProbeTest(&hash_table, probe_rows, 15, true);
+  ResizeTable(hash_table.get(), 2);
+  EXPECT_EQ(hash_table->num_buckets(), 2);
+  ProbeTest(hash_table.get(), probe_rows, 15, true);
 
-  hash_table.Close();
+  hash_table->Close(nullptr);
   mem_pool_.FreeAll();
 }
 
@@ -287,37 +297,38 @@ TEST_F(OldHashTableTest, GrowTableTest) {
   int num_to_add = 4;
   int expected_size = 0;
   MemTracker tracker(100 * 1024 * 1024);
-  OldHashTable hash_table(NULL, build_expr_ctxs_, probe_expr_ctxs_,
-      vector<ExprContext*>(), 1, false,
-      std::vector<bool>(build_expr_ctxs_.size(), false), 0, &tracker,
-      vector<RuntimeFilter*>(), false, num_to_add);
-  EXPECT_FALSE(hash_table.mem_limit_exceeded());
+  scoped_ptr<OldHashTable> hash_table;
+  EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_,
+      vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), 
false),
+      0, &tracker, vector<RuntimeFilter*>(), &hash_table, false, num_to_add));
+  EXPECT_OK(hash_table->Open(nullptr));
+  EXPECT_FALSE(hash_table->mem_limit_exceeded());
   EXPECT_TRUE(!tracker.LimitExceeded());
 
   // This inserts about 5M entries
   int build_row_val = 0;
   for (int i = 0; i < 20; ++i) {
     for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
-      hash_table.Insert(CreateTupleRow(build_row_val));
+      hash_table->Insert(CreateTupleRow(build_row_val));
     }
     expected_size += num_to_add;
     num_to_add *= 2;
   }
-  EXPECT_TRUE(hash_table.mem_limit_exceeded());
+  EXPECT_TRUE(hash_table->mem_limit_exceeded());
   EXPECT_TRUE(tracker.LimitExceeded());
 
   // Validate that we can find the entries before we went over the limit
   for (int i = 0; i < expected_size * 5; i += 100000) {
     TupleRow* probe_row = CreateTupleRow(i);
-    OldHashTable::Iterator iter = hash_table.Find(probe_row);
-    if (i < hash_table.size()) {
-      EXPECT_TRUE(iter != hash_table.End());
+    OldHashTable::Iterator iter = hash_table->Find(probe_row);
+    if (i < hash_table->size()) {
+      EXPECT_TRUE(iter != hash_table->End());
       ValidateMatch(probe_row, iter.GetRow());
     } else {
-      EXPECT_TRUE(iter == hash_table.End());
+      EXPECT_TRUE(iter == hash_table->End());
     }
   }
-  hash_table.Close();
+  hash_table->Close(nullptr);
   mem_pool_.FreeAll();
 }
 

Reply via email to