IMPALA-3344: Simplify sorter and document/enforce invariants.

Clarify relationships between classes, clean up the previous mess
where every class was friends with the other so there's an actual
distinction between public and private members. TupleIterator
is now no longer tied to TupleSorter, just Run.

Document and enforce invariants in many cases.

Factor out some functions from large functions.

Simplify and document iterator logic.

Make management of buffers when iterating over output stream more
explicitly correct: either use MarkNeedToReturn() or attach block
to the batch as appropriate. The SortedRunMerger didn't handle
resource transfer correctly, except if all the memory came from
the batch's MemPool. This patch fixes the cases when resources
are attached to the batches, but not the 'need_to_return' case.
Document that SortedRunMerger requires 'deep_copy_input' to be true
if batches can have the 'need_to_return' flag set.

Also use the atomic block exchange operation when moving between
blocks in unpinned runs to prevent pin failures at that point.
I explicitly have avoided changing the hairy block management logic
when allocating buffers for merging, that will need addressing in
a follow-up patch.

Add a SpilledRuns counter so that it's more explicit that spilling
occurred.

Testing:
Added some tests for corner cases with empty and NULL strings.
Fixed a test that previously failed with OOM but now succeeds.

Performance:
Benchmarking against old code initial revealed some regressions from
changes in inlining. Force inlining the TupleComparator::operator() and
iterator Next()/Prev() functions helped and performance seems similar or
slightly better on the targeted orderby benchmarks.

Change-Id: I9c619e81fd1b8ac50e257172c8bce101a112b52a
Reviewed-on: http://gerrit.cloudera.org:8080/2826
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Tim Armstrong <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/37ec2539
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/37ec2539
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/37ec2539

Branch: refs/heads/master
Commit: 37ec25396f2964c386655ce3408b32a73f71edf6
Parents: b14ca6d
Author: Tim Armstrong <[email protected]>
Authored: Wed May 18 12:05:53 2016 -0700
Committer: Tim Armstrong <[email protected]>
Committed: Thu Jun 2 21:33:08 2016 -0700

----------------------------------------------------------------------
 be/src/exec/sort-node.cc                        |    1 +
 be/src/exprs/agg-fn-evaluator.cc                |    2 +-
 be/src/exprs/agg-fn-evaluator.h                 |   18 +-
 be/src/exprs/case-expr.cc                       |    4 +-
 be/src/exprs/case-expr.h                        |   22 +-
 be/src/exprs/compound-predicates.cc             |    4 +-
 be/src/exprs/compound-predicates.h              |    4 +-
 be/src/exprs/conditional-functions-ir.cc        |    8 +-
 be/src/exprs/conditional-functions.h            |   80 +-
 be/src/exprs/expr-context.cc                    |   10 +-
 be/src/exprs/expr-context.h                     |   10 +-
 be/src/exprs/expr-ir.cc                         |   20 +-
 be/src/exprs/expr.cc                            |   22 +-
 be/src/exprs/expr.h                             |   46 +-
 be/src/exprs/hive-udf-call.cc                   |   22 +-
 be/src/exprs/hive-udf-call.h                    |   22 +-
 be/src/exprs/is-not-empty-predicate.cc          |    2 +-
 be/src/exprs/is-not-empty-predicate.h           |    2 +-
 be/src/exprs/literal.cc                         |   18 +-
 be/src/exprs/literal.h                          |   18 +-
 be/src/exprs/null-literal.cc                    |   22 +-
 be/src/exprs/null-literal.h                     |   22 +-
 be/src/exprs/scalar-fn-call.cc                  |   44 +-
 be/src/exprs/scalar-fn-call.h                   |   26 +-
 be/src/exprs/slot-ref.cc                        |   22 +-
 be/src/exprs/slot-ref.h                         |   22 +-
 be/src/exprs/tuple-is-null-predicate.cc         |    2 +-
 be/src/exprs/tuple-is-null-predicate.h          |    2 +-
 be/src/runtime/data-stream-recvr.cc             |    2 +-
 be/src/runtime/row-batch.h                      |    1 +
 be/src/runtime/sorted-run-merger.cc             |   95 +-
 be/src/runtime/sorted-run-merger.h              |   37 +-
 be/src/runtime/sorter.cc                        | 1388 ++++++++++--------
 be/src/runtime/sorter.h                         |   75 +-
 be/src/runtime/tuple-row.h                      |    6 +-
 be/src/util/tuple-row-compare.h                 |   15 +-
 .../queries/QueryTest/analytic-fns.test         |   26 +-
 .../QueryTest/single-node-large-sorts.test      |    1 +
 .../queries/QueryTest/sort.test                 |   34 +
 .../queries/QueryTest/spilling.test             |    5 +
 tests/query_test/test_sort.py                   |   25 +-
 41 files changed, 1251 insertions(+), 956 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 3ceaf00..46dda88 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -131,6 +131,7 @@ Status SortNode::Reset(RuntimeState* state) {
 void SortNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   sort_exec_exprs_.Close(state);
+  if (sorter_ != NULL) sorter_->Close();
   sorter_.reset();
   ExecNode::Close(state);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/agg-fn-evaluator.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.cc b/be/src/exprs/agg-fn-evaluator.cc
index 981b5f5..beb5211 100644
--- a/be/src/exprs/agg-fn-evaluator.cc
+++ b/be/src/exprs/agg-fn-evaluator.cc
@@ -339,7 +339,7 @@ static void SetAnyVal(const SlotDescriptor* desc, Tuple* 
tuple, AnyVal* dst) {
 }
 
 void AggFnEvaluator::Update(
-    FunctionContext* agg_fn_ctx, TupleRow* row, Tuple* dst, void* fn) {
+    FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst, void* fn) {
   if (fn == NULL) return;
 
   SetAnyVal(intermediate_slot_desc_, dst, staging_intermediate_val_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/agg-fn-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h
index 77ecd47..4661496 100644
--- a/be/src/exprs/agg-fn-evaluator.h
+++ b/be/src/exprs/agg-fn-evaluator.h
@@ -129,11 +129,11 @@ class AggFnEvaluator {
   /// Updates the intermediate state dst based on adding the input src row. 
This can be
   /// called either to drive the UDA's Update() or Merge() function depending 
on
   /// is_merge_. That is, from the caller, it doesn't mater.
-  void Add(FunctionContext* agg_fn_ctx, TupleRow* src, Tuple* dst);
+  void Add(FunctionContext* agg_fn_ctx, const TupleRow* src, Tuple* dst);
 
   /// Updates the intermediate state dst to remove the input src row, i.e. 
undoes
   /// Add(src, dst). Only used internally for analytic fn builtins.
-  void Remove(FunctionContext* agg_fn_ctx, TupleRow* src, Tuple* dst);
+  void Remove(FunctionContext* agg_fn_ctx, const TupleRow* src, Tuple* dst);
 
   /// Explicitly does a merge, even if this evalutor is not marked as merging.
   /// This is used by the partitioned agg node when it needs to merge spill 
results.
@@ -153,9 +153,9 @@ class AggFnEvaluator {
   static void Init(const std::vector<AggFnEvaluator*>& evaluators,
       const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst);
   static void Add(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, TupleRow* src, Tuple* dst);
+      const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, 
Tuple* dst);
   static void Remove(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, TupleRow* src, Tuple* dst);
+      const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, 
Tuple* dst);
   static void Serialize(const std::vector<AggFnEvaluator*>& evaluators,
       const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst);
   static void GetValue(const std::vector<AggFnEvaluator*>& evaluators,
@@ -222,7 +222,7 @@ class AggFnEvaluator {
   /// taking TupleRow to the UDA signature taking AnvVals by populating the 
staging
   /// AnyVals.
   /// fn must be a function that implement's the UDA Update() signature.
-  void Update(FunctionContext* agg_fn_ctx, TupleRow* row, Tuple* dst, void* 
fn);
+  void Update(FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst, 
void* fn);
 
   /// Sets up the arguments to call fn. This converts from the agg-expr 
signature,
   /// taking TupleRow to the UDA signature taking AnvVals. Writes the 
serialize/finalize
@@ -237,12 +237,12 @@ class AggFnEvaluator {
 };
 
 inline void AggFnEvaluator::Add(
-    FunctionContext* agg_fn_ctx, TupleRow* row, Tuple* dst) {
+    FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst) {
   agg_fn_ctx->impl()->IncrementNumUpdates();
   Update(agg_fn_ctx, row, dst, is_merge() ? merge_fn_ : update_fn_);
 }
 inline void AggFnEvaluator::Remove(
-    FunctionContext* agg_fn_ctx, TupleRow* row, Tuple* dst) {
+    FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst) {
   agg_fn_ctx->impl()->IncrementNumRemoves();
   Update(agg_fn_ctx, row, dst, remove_fn_);
 }
@@ -267,14 +267,14 @@ inline void AggFnEvaluator::Init(const 
std::vector<AggFnEvaluator*>& evaluators,
   }
 }
 inline void AggFnEvaluator::Add(const std::vector<AggFnEvaluator*>& evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, TupleRow* src, Tuple* dst) 
{
+      const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, 
Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Add(fn_ctxs[i], src, dst);
   }
 }
 inline void AggFnEvaluator::Remove(const std::vector<AggFnEvaluator*>& 
evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, TupleRow* src, Tuple* dst) 
{
+      const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, 
Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Remove(fn_ctxs[i], src, dst);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/case-expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/case-expr.cc b/be/src/exprs/case-expr.cc
index 595ae7e..394eefc 100644
--- a/be/src/exprs/case-expr.cc
+++ b/be/src/exprs/case-expr.cc
@@ -290,7 +290,7 @@ Status CaseExpr::GetCodegendComputeFn(RuntimeState* state, 
Function** fn) {
   return Status::OK();
 }
 
-void CaseExpr::GetChildVal(int child_idx, ExprContext* ctx, TupleRow* row, 
AnyVal* dst) {
+void CaseExpr::GetChildVal(int child_idx, ExprContext* ctx, const TupleRow* 
row, AnyVal* dst) {
   switch (children()[child_idx]->type().type) {
     case TYPE_BOOLEAN:
       *reinterpret_cast<BooleanVal*>(dst) = 
children()[child_idx]->GetBooleanVal(ctx, row);
@@ -368,7 +368,7 @@ bool CaseExpr::AnyValEq(const ColumnType& type, const 
AnyVal* v1, const AnyVal*
 }
 
 #define CASE_COMPUTE_FN(THEN_TYPE) \
-  THEN_TYPE CaseExpr::Get##THEN_TYPE(ExprContext* ctx, TupleRow* row) { \
+  THEN_TYPE CaseExpr::Get##THEN_TYPE(ExprContext* ctx, const TupleRow* row) { \
     FunctionContext* fn_ctx = ctx->fn_context(fn_context_index_); \
     CaseExprState* state = reinterpret_cast<CaseExprState*>( \
         fn_ctx->GetFunctionState(FunctionContext::THREAD_LOCAL)); \

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/case-expr.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/case-expr.h b/be/src/exprs/case-expr.h
index 5abc34c..0d9ad12 100644
--- a/be/src/exprs/case-expr.h
+++ b/be/src/exprs/case-expr.h
@@ -29,16 +29,16 @@ class CaseExpr: public Expr {
  public:
   virtual Status GetCodegendComputeFn(RuntimeState* state, llvm::Function** 
fn);
 
-  virtual BooleanVal GetBooleanVal(ExprContext* ctx, TupleRow* row);
-  virtual TinyIntVal GetTinyIntVal(ExprContext* ctx, TupleRow* row);
-  virtual SmallIntVal GetSmallIntVal(ExprContext* ctx, TupleRow* row);
-  virtual IntVal GetIntVal(ExprContext* ctx, TupleRow* row);
-  virtual BigIntVal GetBigIntVal(ExprContext* ctx, TupleRow* row);
-  virtual FloatVal GetFloatVal(ExprContext* ctx, TupleRow* row);
-  virtual DoubleVal GetDoubleVal(ExprContext* ctx, TupleRow* row);
-  virtual StringVal GetStringVal(ExprContext* ctx, TupleRow* row);
-  virtual TimestampVal GetTimestampVal(ExprContext* ctx, TupleRow* row);
-  virtual DecimalVal GetDecimalVal(ExprContext* ctx, TupleRow* row);
+  virtual BooleanVal GetBooleanVal(ExprContext* ctx, const TupleRow* row);
+  virtual TinyIntVal GetTinyIntVal(ExprContext* ctx, const TupleRow* row);
+  virtual SmallIntVal GetSmallIntVal(ExprContext* ctx, const TupleRow* row);
+  virtual IntVal GetIntVal(ExprContext* ctx, const TupleRow* row);
+  virtual BigIntVal GetBigIntVal(ExprContext* ctx, const TupleRow* row);
+  virtual FloatVal GetFloatVal(ExprContext* ctx, const TupleRow* row);
+  virtual DoubleVal GetDoubleVal(ExprContext* ctx, const TupleRow* row);
+  virtual StringVal GetStringVal(ExprContext* ctx, const TupleRow* row);
+  virtual TimestampVal GetTimestampVal(ExprContext* ctx, const TupleRow* row);
+  virtual DecimalVal GetDecimalVal(ExprContext* ctx, const TupleRow* row);
 
  protected:
   friend class Expr;
@@ -65,7 +65,7 @@ class CaseExpr: public Expr {
 
   /// Populates 'dst' with the result of calling the appropriate Get*Val() 
function on the
   /// specified child expr.
-  void GetChildVal(int child_idx, ExprContext* ctx, TupleRow* row, AnyVal* 
dst);
+  void GetChildVal(int child_idx, ExprContext* ctx, const TupleRow* row, 
AnyVal* dst);
 
   /// Return true iff *v1 == *v2. v1 and v2 should both be of the specified 
type.
   bool AnyValEq(const ColumnType& type, const AnyVal* v1, const AnyVal* v2);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/compound-predicates.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/compound-predicates.cc 
b/be/src/exprs/compound-predicates.cc
index 3ee7f8a..d2f0134 100644
--- a/be/src/exprs/compound-predicates.cc
+++ b/be/src/exprs/compound-predicates.cc
@@ -25,7 +25,7 @@ using namespace impala;
 using namespace llvm;
 
 // (<> && false) is false, (true && NULL) is NULL
-BooleanVal AndPredicate::GetBooleanVal(ExprContext* context, TupleRow* row) {
+BooleanVal AndPredicate::GetBooleanVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(children_.size(), 2);
   BooleanVal val1 = children_[0]->GetBooleanVal(context, row);
   if (!val1.is_null && !val1.val) return BooleanVal(false); // short-circuit
@@ -44,7 +44,7 @@ string AndPredicate::DebugString() const {
 }
 
 // (<> || true) is true, (false || NULL) is NULL
-BooleanVal OrPredicate::GetBooleanVal(ExprContext* context, TupleRow* row) {
+BooleanVal OrPredicate::GetBooleanVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(children_.size(), 2);
   BooleanVal val1 = children_[0]->GetBooleanVal(context, row);
   if (!val1.is_null && val1.val) return BooleanVal(true); // short-circuit

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/compound-predicates.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/compound-predicates.h 
b/be/src/exprs/compound-predicates.h
index b391831..46c6e87 100644
--- a/be/src/exprs/compound-predicates.h
+++ b/be/src/exprs/compound-predicates.h
@@ -37,7 +37,7 @@ class CompoundPredicate: public Predicate {
 /// Expr for evaluating and (&&) operators
 class AndPredicate: public CompoundPredicate {
  public:
-  virtual impala_udf::BooleanVal GetBooleanVal(ExprContext* context, 
TupleRow*);
+  virtual impala_udf::BooleanVal GetBooleanVal(ExprContext* context, const 
TupleRow*);
 
   virtual Status GetCodegendComputeFn(RuntimeState* state, llvm::Function** 
fn) {
     return CompoundPredicate::CodegenComputeFn(true, state, fn);
@@ -56,7 +56,7 @@ class AndPredicate: public CompoundPredicate {
 /// Expr for evaluating or (||) operators
 class OrPredicate: public CompoundPredicate {
  public:
-  virtual impala_udf::BooleanVal GetBooleanVal(ExprContext* context, 
TupleRow*);
+  virtual impala_udf::BooleanVal GetBooleanVal(ExprContext* context, const 
TupleRow*);
 
   virtual Status GetCodegendComputeFn(RuntimeState* state, llvm::Function** 
fn) {
     return CompoundPredicate::CodegenComputeFn(false, state, fn);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/conditional-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/conditional-functions-ir.cc 
b/be/src/exprs/conditional-functions-ir.cc
index 8581104..5af143c 100644
--- a/be/src/exprs/conditional-functions-ir.cc
+++ b/be/src/exprs/conditional-functions-ir.cc
@@ -20,7 +20,7 @@ using namespace impala;
 using namespace impala_udf;
 
 #define IS_NULL_COMPUTE_FUNCTION(type) \
-  type IsNullExpr::Get##type(ExprContext* context, TupleRow* row) { \
+  type IsNullExpr::Get##type(ExprContext* context, const TupleRow* row) { \
     DCHECK_EQ(children_.size(), 2); \
     type val = children_[0]->Get##type(context, row); \
     if (!val.is_null) return val; /* short-circuit */ \
@@ -39,7 +39,7 @@ IS_NULL_COMPUTE_FUNCTION(TimestampVal);
 IS_NULL_COMPUTE_FUNCTION(DecimalVal);
 
 #define NULL_IF_COMPUTE_FUNCTION(AnyValType) \
-  AnyValType NullIfExpr::Get##AnyValType(ExprContext* ctx, TupleRow* row) { \
+  AnyValType NullIfExpr::Get##AnyValType(ExprContext* ctx, const TupleRow* 
row) { \
     DCHECK_EQ(children_.size(), 2); \
     AnyValType lhs_val = children_[0]->Get##AnyValType(ctx, row); \
     /* Short-circuit in case lhs_val is NULL. Can never be equal to RHS. */ \
@@ -112,7 +112,7 @@ ZERO_IF_NULL_COMPUTE_FUNCTION(DoubleVal);
 ZERO_IF_NULL_COMPUTE_FUNCTION(DecimalVal);
 
 #define IF_COMPUTE_FUNCTION(type) \
-  type IfExpr::Get##type(ExprContext* context, TupleRow* row) { \
+  type IfExpr::Get##type(ExprContext* context, const TupleRow* row) { \
     DCHECK_EQ(children_.size(), 3); \
     BooleanVal cond = children_[0]->GetBooleanVal(context, row); \
     if (cond.is_null || !cond.val) { \
@@ -133,7 +133,7 @@ IF_COMPUTE_FUNCTION(TimestampVal);
 IF_COMPUTE_FUNCTION(DecimalVal);
 
 #define COALESCE_COMPUTE_FUNCTION(type) \
-  type CoalesceExpr::Get##type(ExprContext* context, TupleRow* row) { \
+  type CoalesceExpr::Get##type(ExprContext* context, const TupleRow* row) { \
     DCHECK_GE(children_.size(), 1); \
     for (int i = 0; i < children_.size(); ++i) {                  \
       type val = children_[i]->Get##type(context, row); \

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/conditional-functions.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/conditional-functions.h 
b/be/src/exprs/conditional-functions.h
index 0e840e5..7717298 100644
--- a/be/src/exprs/conditional-functions.h
+++ b/be/src/exprs/conditional-functions.h
@@ -59,16 +59,16 @@ class ConditionalFunctions {
 
 class IsNullExpr : public Expr {
  public:
-  virtual BooleanVal GetBooleanVal(ExprContext* context, TupleRow* row);
-  virtual TinyIntVal GetTinyIntVal(ExprContext* context, TupleRow* row);
-  virtual SmallIntVal GetSmallIntVal(ExprContext* context, TupleRow* row);
-  virtual IntVal GetIntVal(ExprContext* context, TupleRow* row);
-  virtual BigIntVal GetBigIntVal(ExprContext* context, TupleRow* row);
-  virtual FloatVal GetFloatVal(ExprContext* context, TupleRow* row);
-  virtual DoubleVal GetDoubleVal(ExprContext* context, TupleRow* row);
-  virtual StringVal GetStringVal(ExprContext* context, TupleRow* row);
-  virtual TimestampVal GetTimestampVal(ExprContext* context, TupleRow* row);
-  virtual DecimalVal GetDecimalVal(ExprContext* context, TupleRow* row);
+  virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow* row);
+  virtual TinyIntVal GetTinyIntVal(ExprContext* context, const TupleRow* row);
+  virtual SmallIntVal GetSmallIntVal(ExprContext* context, const TupleRow* 
row);
+  virtual IntVal GetIntVal(ExprContext* context, const TupleRow* row);
+  virtual BigIntVal GetBigIntVal(ExprContext* context, const TupleRow* row);
+  virtual FloatVal GetFloatVal(ExprContext* context, const TupleRow* row);
+  virtual DoubleVal GetDoubleVal(ExprContext* context, const TupleRow* row);
+  virtual StringVal GetStringVal(ExprContext* context, const TupleRow* row);
+  virtual TimestampVal GetTimestampVal(ExprContext* context, const TupleRow* 
row);
+  virtual DecimalVal GetDecimalVal(ExprContext* context, const TupleRow* row);
 
   virtual Status GetCodegendComputeFn(RuntimeState* state, llvm::Function** 
fn);
   virtual std::string DebugString() const { return 
Expr::DebugString("IsNullExpr"); }
@@ -80,16 +80,16 @@ class IsNullExpr : public Expr {
 
 class NullIfExpr : public Expr {
  public:
-  virtual BooleanVal GetBooleanVal(ExprContext* context, TupleRow* row);
-  virtual TinyIntVal GetTinyIntVal(ExprContext* context, TupleRow* row);
-  virtual SmallIntVal GetSmallIntVal(ExprContext* context, TupleRow* row);
-  virtual IntVal GetIntVal(ExprContext* context, TupleRow* row);
-  virtual BigIntVal GetBigIntVal(ExprContext* context, TupleRow* row);
-  virtual FloatVal GetFloatVal(ExprContext* context, TupleRow* row);
-  virtual DoubleVal GetDoubleVal(ExprContext* context, TupleRow* row);
-  virtual StringVal GetStringVal(ExprContext* context, TupleRow* row);
-  virtual TimestampVal GetTimestampVal(ExprContext* context, TupleRow* row);
-  virtual DecimalVal GetDecimalVal(ExprContext* context, TupleRow* row);
+  virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow* row);
+  virtual TinyIntVal GetTinyIntVal(ExprContext* context, const TupleRow* row);
+  virtual SmallIntVal GetSmallIntVal(ExprContext* context, const TupleRow* 
row);
+  virtual IntVal GetIntVal(ExprContext* context, const TupleRow* row);
+  virtual BigIntVal GetBigIntVal(ExprContext* context, const TupleRow* row);
+  virtual FloatVal GetFloatVal(ExprContext* context, const TupleRow* row);
+  virtual DoubleVal GetDoubleVal(ExprContext* context, const TupleRow* row);
+  virtual StringVal GetStringVal(ExprContext* context, const TupleRow* row);
+  virtual TimestampVal GetTimestampVal(ExprContext* context, const TupleRow* 
row);
+  virtual DecimalVal GetDecimalVal(ExprContext* context, const TupleRow* row);
 
   virtual Status GetCodegendComputeFn(RuntimeState* state, llvm::Function** 
fn);
   virtual std::string DebugString() const { return 
Expr::DebugString("NullIfExpr"); }
@@ -101,16 +101,16 @@ class NullIfExpr : public Expr {
 
 class IfExpr : public Expr {
  public:
-  virtual BooleanVal GetBooleanVal(ExprContext* context, TupleRow* row);
-  virtual TinyIntVal GetTinyIntVal(ExprContext* context, TupleRow* row);
-  virtual SmallIntVal GetSmallIntVal(ExprContext* context, TupleRow* row);
-  virtual IntVal GetIntVal(ExprContext* context, TupleRow* row);
-  virtual BigIntVal GetBigIntVal(ExprContext* context, TupleRow* row);
-  virtual FloatVal GetFloatVal(ExprContext* context, TupleRow* row);
-  virtual DoubleVal GetDoubleVal(ExprContext* context, TupleRow* row);
-  virtual StringVal GetStringVal(ExprContext* context, TupleRow* row);
-  virtual TimestampVal GetTimestampVal(ExprContext* context, TupleRow* row);
-  virtual DecimalVal GetDecimalVal(ExprContext* context, TupleRow* row);
+  virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow* row);
+  virtual TinyIntVal GetTinyIntVal(ExprContext* context, const TupleRow* row);
+  virtual SmallIntVal GetSmallIntVal(ExprContext* context, const TupleRow* 
row);
+  virtual IntVal GetIntVal(ExprContext* context, const TupleRow* row);
+  virtual BigIntVal GetBigIntVal(ExprContext* context, const TupleRow* row);
+  virtual FloatVal GetFloatVal(ExprContext* context, const TupleRow* row);
+  virtual DoubleVal GetDoubleVal(ExprContext* context, const TupleRow* row);
+  virtual StringVal GetStringVal(ExprContext* context, const TupleRow* row);
+  virtual TimestampVal GetTimestampVal(ExprContext* context, const TupleRow* 
row);
+  virtual DecimalVal GetDecimalVal(ExprContext* context, const TupleRow* row);
 
   virtual Status GetCodegendComputeFn(RuntimeState* state, llvm::Function** 
fn);
   virtual std::string DebugString() const { return 
Expr::DebugString("IfExpr"); }
@@ -122,16 +122,16 @@ class IfExpr : public Expr {
 
 class CoalesceExpr : public Expr {
  public:
-  virtual BooleanVal GetBooleanVal(ExprContext* context, TupleRow* row);
-  virtual TinyIntVal GetTinyIntVal(ExprContext* context, TupleRow* row);
-  virtual SmallIntVal GetSmallIntVal(ExprContext* context, TupleRow* row);
-  virtual IntVal GetIntVal(ExprContext* context, TupleRow* row);
-  virtual BigIntVal GetBigIntVal(ExprContext* context, TupleRow* row);
-  virtual FloatVal GetFloatVal(ExprContext* context, TupleRow* row);
-  virtual DoubleVal GetDoubleVal(ExprContext* context, TupleRow* row);
-  virtual StringVal GetStringVal(ExprContext* context, TupleRow* row);
-  virtual TimestampVal GetTimestampVal(ExprContext* context, TupleRow* row);
-  virtual DecimalVal GetDecimalVal(ExprContext* context, TupleRow* row);
+  virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow* row);
+  virtual TinyIntVal GetTinyIntVal(ExprContext* context, const TupleRow* row);
+  virtual SmallIntVal GetSmallIntVal(ExprContext* context, const TupleRow* 
row);
+  virtual IntVal GetIntVal(ExprContext* context, const TupleRow* row);
+  virtual BigIntVal GetBigIntVal(ExprContext* context, const TupleRow* row);
+  virtual FloatVal GetFloatVal(ExprContext* context, const TupleRow* row);
+  virtual DoubleVal GetDoubleVal(ExprContext* context, const TupleRow* row);
+  virtual StringVal GetStringVal(ExprContext* context, const TupleRow* row);
+  virtual TimestampVal GetTimestampVal(ExprContext* context, const TupleRow* 
row);
+  virtual DecimalVal GetDecimalVal(ExprContext* context, const TupleRow* row);
 
   virtual std::string DebugString() const { return 
Expr::DebugString("CoalesceExpr"); }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/expr-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.cc b/be/src/exprs/expr-context.cc
index 4420394..c58f3d9 100644
--- a/be/src/exprs/expr-context.cc
+++ b/be/src/exprs/expr-context.cc
@@ -147,7 +147,7 @@ void ExprContext::FreeLocalAllocations(const 
vector<FunctionContext*>& fn_ctxs)
   }
 }
 
-void ExprContext::GetValue(TupleRow* row, bool as_ascii, TColumnValue* 
col_val) {
+void ExprContext::GetValue(const TupleRow* row, bool as_ascii, TColumnValue* 
col_val) {
   void* value = GetValue(row);
   if (as_ascii) {
     RawValue::PrintValue(value, root_->type_, root_->output_scale_, 
&col_val->string_val);
@@ -221,11 +221,11 @@ void ExprContext::GetValue(TupleRow* row, bool as_ascii, 
TColumnValue* col_val)
   }
 }
 
-void* ExprContext::GetValue(TupleRow* row) {
+void* ExprContext::GetValue(const TupleRow* row) {
   return GetValue(root_, row);
 }
 
-void* ExprContext::GetValue(Expr* e, TupleRow* row) {
+void* ExprContext::GetValue(Expr* e, const TupleRow* row) {
   switch (e->type_.type) {
     case TYPE_BOOLEAN: {
       impala_udf::BooleanVal v = e->GetBooleanVal(this, row);
@@ -326,7 +326,7 @@ void* ExprContext::GetValue(Expr* e, TupleRow* row) {
   }
 }
 
-void ExprContext::PrintValue(TupleRow* row, string* str) {
+void ExprContext::PrintValue(const TupleRow* row, string* str) {
   RawValue::PrintValue(GetValue(row), root_->type(), root_->output_scale_, 
str);
 }
 void ExprContext::PrintValue(void* value, string* str) {
@@ -335,7 +335,7 @@ void ExprContext::PrintValue(void* value, string* str) {
 void ExprContext::PrintValue(void* value, stringstream* stream) {
   RawValue::PrintValue(value, root_->type(), root_->output_scale_, stream);
 }
-void ExprContext::PrintValue(TupleRow* row, stringstream* stream) {
+void ExprContext::PrintValue(const TupleRow* row, stringstream* stream) {
   RawValue::PrintValue(GetValue(row), root_->type(), root_->output_scale_, 
stream);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/expr-context.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.h b/be/src/exprs/expr-context.h
index 9078ce6..2023bdc 100644
--- a/be/src/exprs/expr-context.h
+++ b/be/src/exprs/expr-context.h
@@ -66,7 +66,7 @@ class ExprContext {
 
   /// Calls the appropriate Get*Val() function on this context's expr tree and 
stores the
   /// result in result_.
-  void* GetValue(TupleRow* row);
+  void* GetValue(const TupleRow* row);
 
   /// Convenience function: extract value into col_val and sets the
   /// appropriate __isset flag.
@@ -83,13 +83,13 @@ class ExprContext {
   /// TYPE_TIMESTAMP: stringVal
   /// Note: timestamp is converted to string via RawValue::PrintValue because 
HiveServer2
   /// requires timestamp in a string format.
-  void GetValue(TupleRow* row, bool as_ascii, TColumnValue* col_val);
+  void GetValue(const TupleRow* row, bool as_ascii, TColumnValue* col_val);
 
   /// Convenience functions: print value into 'str' or 'stream'.  NULL turns 
into "NULL".
-  void PrintValue(TupleRow* row, std::string* str);
+  void PrintValue(const TupleRow* row, std::string* str);
   void PrintValue(void* value, std::string* str);
   void PrintValue(void* value, std::stringstream* stream);
-  void PrintValue(TupleRow* row, std::stringstream* stream);
+  void PrintValue(const TupleRow* row, std::stringstream* stream);
 
   /// Creates a FunctionContext, and returns the index that's passed to 
fn_context() to
   /// retrieve the created context. Exprs that need a FunctionContext should 
call this in
@@ -172,7 +172,7 @@ class ExprContext {
 
   /// Calls the appropriate Get*Val() function on 'e' and stores the result in 
result_.
   /// This is used by Exprs to call GetValue() on a child expr, rather than 
root_.
-  void* GetValue(Expr* e, TupleRow* row);
+  void* GetValue(Expr* e, const TupleRow* row);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/expr-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-ir.cc b/be/src/exprs/expr-ir.cc
index dae49aa..ad324b0 100644
--- a/be/src/exprs/expr-ir.cc
+++ b/be/src/exprs/expr-ir.cc
@@ -39,33 +39,33 @@ using namespace impala_udf;
 // Static wrappers around Get*Val() functions. We'd like to be able to call 
these from
 // directly from native code as well as from generated IR functions.
 
-BooleanVal Expr::GetBooleanVal(Expr* expr, ExprContext* context, TupleRow* 
row) {
+BooleanVal Expr::GetBooleanVal(Expr* expr, ExprContext* context, const 
TupleRow* row) {
   return expr->GetBooleanVal(context, row);
 }
-TinyIntVal Expr::GetTinyIntVal(Expr* expr, ExprContext* context, TupleRow* 
row) {
+TinyIntVal Expr::GetTinyIntVal(Expr* expr, ExprContext* context, const 
TupleRow* row) {
   return expr->GetTinyIntVal(context, row);
 }
-SmallIntVal Expr::GetSmallIntVal(Expr* expr, ExprContext* context, TupleRow* 
row) {
+SmallIntVal Expr::GetSmallIntVal(Expr* expr, ExprContext* context, const 
TupleRow* row) {
   return expr->GetSmallIntVal(context, row);
 }
-IntVal Expr::GetIntVal(Expr* expr, ExprContext* context, TupleRow* row) {
+IntVal Expr::GetIntVal(Expr* expr, ExprContext* context, const TupleRow* row) {
   return expr->GetIntVal(context, row);
 }
-BigIntVal Expr::GetBigIntVal(Expr* expr, ExprContext* context, TupleRow* row) {
+BigIntVal Expr::GetBigIntVal(Expr* expr, ExprContext* context, const TupleRow* 
row) {
   return expr->GetBigIntVal(context, row);
 }
-FloatVal Expr::GetFloatVal(Expr* expr, ExprContext* context, TupleRow* row) {
+FloatVal Expr::GetFloatVal(Expr* expr, ExprContext* context, const TupleRow* 
row) {
   return expr->GetFloatVal(context, row);
 }
-DoubleVal Expr::GetDoubleVal(Expr* expr, ExprContext* context, TupleRow* row) {
+DoubleVal Expr::GetDoubleVal(Expr* expr, ExprContext* context, const TupleRow* 
row) {
   return expr->GetDoubleVal(context, row);
 }
-StringVal Expr::GetStringVal(Expr* expr, ExprContext* context, TupleRow* row) {
+StringVal Expr::GetStringVal(Expr* expr, ExprContext* context, const TupleRow* 
row) {
   return expr->GetStringVal(context, row);
 }
-TimestampVal Expr::GetTimestampVal(Expr* expr, ExprContext* context, TupleRow* 
row) {
+TimestampVal Expr::GetTimestampVal(Expr* expr, ExprContext* context, const 
TupleRow* row) {
   return expr->GetTimestampVal(context, row);
 }
-DecimalVal Expr::GetDecimalVal(Expr* expr, ExprContext* context, TupleRow* 
row) {
+DecimalVal Expr::GetDecimalVal(Expr* expr, ExprContext* context, const 
TupleRow* row) {
   return expr->GetDecimalVal(context, row);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/expr.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr.cc b/be/src/exprs/expr.cc
index 0156edc..ca81465 100644
--- a/be/src/exprs/expr.cc
+++ b/be/src/exprs/expr.cc
@@ -678,47 +678,47 @@ Status Expr::GetCodegendComputeFnWrapper(RuntimeState* 
state, Function** fn) {
 }
 
 // At least one of these should always be subclassed.
-BooleanVal Expr::GetBooleanVal(ExprContext* context, TupleRow* row) {
+BooleanVal Expr::GetBooleanVal(ExprContext* context, const TupleRow* row) {
   DCHECK(false) << DebugString();
   return BooleanVal::null();
 }
-TinyIntVal Expr::GetTinyIntVal(ExprContext* context, TupleRow* row) {
+TinyIntVal Expr::GetTinyIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK(false) << DebugString();
   return TinyIntVal::null();
 }
-SmallIntVal Expr::GetSmallIntVal(ExprContext* context, TupleRow* row) {
+SmallIntVal Expr::GetSmallIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK(false) << DebugString();
   return SmallIntVal::null();
 }
-IntVal Expr::GetIntVal(ExprContext* context, TupleRow* row) {
+IntVal Expr::GetIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK(false) << DebugString();
   return IntVal::null();
 }
-BigIntVal Expr::GetBigIntVal(ExprContext* context, TupleRow* row) {
+BigIntVal Expr::GetBigIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK(false) << DebugString();
   return BigIntVal::null();
 }
-FloatVal Expr::GetFloatVal(ExprContext* context, TupleRow* row) {
+FloatVal Expr::GetFloatVal(ExprContext* context, const TupleRow* row) {
   DCHECK(false) << DebugString();
   return FloatVal::null();
 }
-DoubleVal Expr::GetDoubleVal(ExprContext* context, TupleRow* row) {
+DoubleVal Expr::GetDoubleVal(ExprContext* context, const TupleRow* row) {
   DCHECK(false) << DebugString();
   return DoubleVal::null();
 }
-StringVal Expr::GetStringVal(ExprContext* context, TupleRow* row) {
+StringVal Expr::GetStringVal(ExprContext* context, const TupleRow* row) {
   DCHECK(false) << DebugString();
   return StringVal::null();
 }
-CollectionVal Expr::GetCollectionVal(ExprContext* context, TupleRow* row) {
+CollectionVal Expr::GetCollectionVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK(false) << DebugString();
   return CollectionVal::null();
 }
-TimestampVal Expr::GetTimestampVal(ExprContext* context, TupleRow* row) {
+TimestampVal Expr::GetTimestampVal(ExprContext* context, const TupleRow* row) {
   DCHECK(false) << DebugString();
   return TimestampVal::null();
 }
-DecimalVal Expr::GetDecimalVal(ExprContext* context, TupleRow* row) {
+DecimalVal Expr::GetDecimalVal(ExprContext* context, const TupleRow* row) {
   DCHECK(false) << DebugString();
   return DecimalVal::null();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/expr.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h
index e0c55b6..3720236 100644
--- a/be/src/exprs/expr.h
+++ b/be/src/exprs/expr.h
@@ -19,7 +19,7 @@
 /// and produces a scalar result. This function evaluates the necessary child 
arguments by
 /// calling their compute functions, then performs whatever computation is 
necessary on
 /// the arguments (e.g. calling a UDF with the child arguments). All compute 
functions
-/// take arguments (ExprContext*, TupleRow*). The return type is a *Val (i.e. 
a subclass
+/// take arguments (ExprContext*, const TupleRow*). The return type is a *Val 
(i.e. a subclass
 /// of AnyVal). Thus, a single expression will implement a compute function 
for every
 /// return type it supports.
 ///
@@ -130,17 +130,17 @@ class Expr {
   /// the functions for the return type(s) it supports. For example, a boolean 
function
   /// will only implement GetBooleanVal(). Some Exprs, like Literal, have many 
possible
   /// return types and will implement multiple Get*Val() functions.
-  virtual BooleanVal GetBooleanVal(ExprContext* context, TupleRow*);
-  virtual TinyIntVal GetTinyIntVal(ExprContext* context, TupleRow*);
-  virtual SmallIntVal GetSmallIntVal(ExprContext* context, TupleRow*);
-  virtual IntVal GetIntVal(ExprContext* context, TupleRow*);
-  virtual BigIntVal GetBigIntVal(ExprContext* context, TupleRow*);
-  virtual FloatVal GetFloatVal(ExprContext* context, TupleRow*);
-  virtual DoubleVal GetDoubleVal(ExprContext* context, TupleRow*);
-  virtual StringVal GetStringVal(ExprContext* context, TupleRow*);
-  virtual CollectionVal GetCollectionVal(ExprContext* context, TupleRow*);
-  virtual TimestampVal GetTimestampVal(ExprContext* context, TupleRow*);
-  virtual DecimalVal GetDecimalVal(ExprContext* context, TupleRow*);
+  virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow*);
+  virtual TinyIntVal GetTinyIntVal(ExprContext* context, const TupleRow*);
+  virtual SmallIntVal GetSmallIntVal(ExprContext* context, const TupleRow*);
+  virtual IntVal GetIntVal(ExprContext* context, const TupleRow*);
+  virtual BigIntVal GetBigIntVal(ExprContext* context, const TupleRow*);
+  virtual FloatVal GetFloatVal(ExprContext* context, const TupleRow*);
+  virtual DoubleVal GetDoubleVal(ExprContext* context, const TupleRow*);
+  virtual StringVal GetStringVal(ExprContext* context, const TupleRow*);
+  virtual CollectionVal GetCollectionVal(ExprContext* context, const 
TupleRow*);
+  virtual TimestampVal GetTimestampVal(ExprContext* context, const TupleRow*);
+  virtual DecimalVal GetDecimalVal(ExprContext* context, const TupleRow*);
 
   /// Get the number of digits after the decimal that should be displayed for 
this value.
   /// Returns -1 if no scale has been specified (currently the scale is only 
set for
@@ -227,7 +227,7 @@ class Expr {
       std::vector<int>* offsets, int* var_result_begin);
 
   /// Returns an llvm::Function* with signature:
-  /// <subclass of AnyVal> ComputeFn(ExprContext* context, TupleRow* row)
+  /// <subclass of AnyVal> ComputeFn(ExprContext* context, const TupleRow* row)
   //
   /// The function should evaluate this expr over 'row' and return the result 
as the
   /// appropriate type of AnyVal.
@@ -431,16 +431,16 @@ class Expr {
   /// These are used to call Get*Val() functions from generated functions, 
since I don't
   /// know how to call virtual functions directly. GetStaticGetValWrapper() 
returns the
   /// IR function of the appropriate wrapper function.
-  static BooleanVal GetBooleanVal(Expr* expr, ExprContext* context, TupleRow* 
row);
-  static TinyIntVal GetTinyIntVal(Expr* expr, ExprContext* context, TupleRow* 
row);
-  static SmallIntVal GetSmallIntVal(Expr* expr, ExprContext* context, 
TupleRow* row);
-  static IntVal GetIntVal(Expr* expr, ExprContext* context, TupleRow* row);
-  static BigIntVal GetBigIntVal(Expr* expr, ExprContext* context, TupleRow* 
row);
-  static FloatVal GetFloatVal(Expr* expr, ExprContext* context, TupleRow* row);
-  static DoubleVal GetDoubleVal(Expr* expr, ExprContext* context, TupleRow* 
row);
-  static StringVal GetStringVal(Expr* expr, ExprContext* context, TupleRow* 
row);
-  static TimestampVal GetTimestampVal(Expr* expr, ExprContext* context, 
TupleRow* row);
-  static DecimalVal GetDecimalVal(Expr* expr, ExprContext* context, TupleRow* 
row);
+  static BooleanVal GetBooleanVal(Expr* expr, ExprContext* context, const 
TupleRow* row);
+  static TinyIntVal GetTinyIntVal(Expr* expr, ExprContext* context, const 
TupleRow* row);
+  static SmallIntVal GetSmallIntVal(Expr* expr, ExprContext* context, const 
TupleRow* row);
+  static IntVal GetIntVal(Expr* expr, ExprContext* context, const TupleRow* 
row);
+  static BigIntVal GetBigIntVal(Expr* expr, ExprContext* context, const 
TupleRow* row);
+  static FloatVal GetFloatVal(Expr* expr, ExprContext* context, const 
TupleRow* row);
+  static DoubleVal GetDoubleVal(Expr* expr, ExprContext* context, const 
TupleRow* row);
+  static StringVal GetStringVal(Expr* expr, ExprContext* context, const 
TupleRow* row);
+  static TimestampVal GetTimestampVal(Expr* expr, ExprContext* context, const 
TupleRow* row);
+  static DecimalVal GetDecimalVal(Expr* expr, ExprContext* context, const 
TupleRow* row);
 
 
   // Helper function for GetConstantInt() and InlineConstants(): return the 
constant value

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/hive-udf-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index 660c027..50cd0d7 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -71,7 +71,7 @@ HiveUdfCall::HiveUdfCall(const TExprNode& node)
   DCHECK(executor_cl_ != NULL) << "Init() was not called!";
 }
 
-AnyVal* HiveUdfCall::Evaluate(ExprContext* ctx, TupleRow* row) {
+AnyVal* HiveUdfCall::Evaluate(ExprContext* ctx, const TupleRow* row) {
   FunctionContext* fn_ctx = ctx->fn_context(fn_context_index_);
   JniContext* jni_ctx = reinterpret_cast<JniContext*>(
       fn_ctx->GetFunctionState(FunctionContext::THREAD_LOCAL));
@@ -286,52 +286,52 @@ string HiveUdfCall::DebugString() const {
   return out.str();
 }
 
-BooleanVal HiveUdfCall::GetBooleanVal(ExprContext* ctx, TupleRow* row) {
+BooleanVal HiveUdfCall::GetBooleanVal(ExprContext* ctx, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_BOOLEAN);
   return *reinterpret_cast<BooleanVal*>(Evaluate(ctx, row));
 }
 
-TinyIntVal HiveUdfCall::GetTinyIntVal(ExprContext* ctx, TupleRow* row) {
+TinyIntVal HiveUdfCall::GetTinyIntVal(ExprContext* ctx, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_TINYINT);
   return *reinterpret_cast<TinyIntVal*>(Evaluate(ctx, row));
 }
 
-SmallIntVal HiveUdfCall::GetSmallIntVal(ExprContext* ctx, TupleRow* row) {
+SmallIntVal HiveUdfCall::GetSmallIntVal(ExprContext* ctx, const TupleRow* row) 
{
   DCHECK_EQ(type_.type, TYPE_SMALLINT);
   return * reinterpret_cast<SmallIntVal*>(Evaluate(ctx, row));
 }
 
-IntVal HiveUdfCall::GetIntVal(ExprContext* ctx, TupleRow* row) {
+IntVal HiveUdfCall::GetIntVal(ExprContext* ctx, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_INT);
   return *reinterpret_cast<IntVal*>(Evaluate(ctx, row));
 }
 
-BigIntVal HiveUdfCall::GetBigIntVal(ExprContext* ctx, TupleRow* row) {
+BigIntVal HiveUdfCall::GetBigIntVal(ExprContext* ctx, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_BIGINT);
   return *reinterpret_cast<BigIntVal*>(Evaluate(ctx, row));
 }
 
-FloatVal HiveUdfCall::GetFloatVal(ExprContext* ctx, TupleRow* row) {
+FloatVal HiveUdfCall::GetFloatVal(ExprContext* ctx, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_FLOAT);
   return *reinterpret_cast<FloatVal*>(Evaluate(ctx, row));
 }
 
-DoubleVal HiveUdfCall::GetDoubleVal(ExprContext* ctx, TupleRow* row) {
+DoubleVal HiveUdfCall::GetDoubleVal(ExprContext* ctx, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_DOUBLE);
   return *reinterpret_cast<DoubleVal*>(Evaluate(ctx, row));
 }
 
-StringVal HiveUdfCall::GetStringVal(ExprContext* ctx, TupleRow* row) {
+StringVal HiveUdfCall::GetStringVal(ExprContext* ctx, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_STRING);
   return *reinterpret_cast<StringVal*>(Evaluate(ctx, row));
 }
 
-TimestampVal HiveUdfCall::GetTimestampVal(ExprContext* ctx, TupleRow* row) {
+TimestampVal HiveUdfCall::GetTimestampVal(ExprContext* ctx, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_TIMESTAMP);
   return *reinterpret_cast<TimestampVal*>(Evaluate(ctx, row));
 }
 
-DecimalVal HiveUdfCall::GetDecimalVal(ExprContext* ctx, TupleRow* row) {
+DecimalVal HiveUdfCall::GetDecimalVal(ExprContext* ctx, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_DECIMAL);
   return *reinterpret_cast<DecimalVal*>(Evaluate(ctx, row));
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/hive-udf-call.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.h b/be/src/exprs/hive-udf-call.h
index 2a35113..688bcc7 100644
--- a/be/src/exprs/hive-udf-call.h
+++ b/be/src/exprs/hive-udf-call.h
@@ -68,16 +68,16 @@ class HiveUdfCall : public Expr {
   virtual void Close(RuntimeState* state, ExprContext* context,
       FunctionContext::FunctionStateScope scope = 
FunctionContext::FRAGMENT_LOCAL);
 
-  virtual BooleanVal GetBooleanVal(ExprContext* ctx, TupleRow*);
-  virtual TinyIntVal GetTinyIntVal(ExprContext* ctx, TupleRow*);
-  virtual SmallIntVal GetSmallIntVal(ExprContext* ctx, TupleRow*);
-  virtual IntVal GetIntVal(ExprContext* ctx, TupleRow*);
-  virtual BigIntVal GetBigIntVal(ExprContext* ctx, TupleRow*);
-  virtual FloatVal GetFloatVal(ExprContext* ctx, TupleRow*);
-  virtual DoubleVal GetDoubleVal(ExprContext* ctx, TupleRow*);
-  virtual StringVal GetStringVal(ExprContext* ctx, TupleRow*);
-  virtual TimestampVal GetTimestampVal(ExprContext* ctx, TupleRow*);
-  virtual DecimalVal GetDecimalVal(ExprContext* ctx, TupleRow*);
+  virtual BooleanVal GetBooleanVal(ExprContext* ctx, const TupleRow*);
+  virtual TinyIntVal GetTinyIntVal(ExprContext* ctx, const TupleRow*);
+  virtual SmallIntVal GetSmallIntVal(ExprContext* ctx, const TupleRow*);
+  virtual IntVal GetIntVal(ExprContext* ctx, const TupleRow*);
+  virtual BigIntVal GetBigIntVal(ExprContext* ctx, const TupleRow*);
+  virtual FloatVal GetFloatVal(ExprContext* ctx, const TupleRow*);
+  virtual DoubleVal GetDoubleVal(ExprContext* ctx, const TupleRow*);
+  virtual StringVal GetStringVal(ExprContext* ctx, const TupleRow*);
+  virtual TimestampVal GetTimestampVal(ExprContext* ctx, const TupleRow*);
+  virtual DecimalVal GetDecimalVal(ExprContext* ctx, const TupleRow*);
 
   virtual Status GetCodegendComputeFn(RuntimeState* state, llvm::Function** 
fn);
 
@@ -92,7 +92,7 @@ class HiveUdfCall : public Expr {
   /// Evalutes the UDF over row. Returns the result as an AnyVal. This function
   /// never returns NULL but rather an AnyVal object with is_null set to true 
on
   /// error.
-  AnyVal* Evaluate(ExprContext* ctx, TupleRow* row);
+  AnyVal* Evaluate(ExprContext* ctx, const TupleRow* row);
 
   /// The path on the local FS to the UDF's jar
   std::string local_location_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/is-not-empty-predicate.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/is-not-empty-predicate.cc 
b/be/src/exprs/is-not-empty-predicate.cc
index bba2efa..1eebc12 100644
--- a/be/src/exprs/is-not-empty-predicate.cc
+++ b/be/src/exprs/is-not-empty-predicate.cc
@@ -26,7 +26,7 @@ IsNotEmptyPredicate::IsNotEmptyPredicate(const TExprNode& 
node)
   : Predicate(node) {
 }
 
-BooleanVal IsNotEmptyPredicate::GetBooleanVal(ExprContext* ctx, TupleRow* row) 
{
+BooleanVal IsNotEmptyPredicate::GetBooleanVal(ExprContext* ctx, const 
TupleRow* row) {
   CollectionVal coll = children_[0]->GetCollectionVal(ctx, row);
   if (coll.is_null) return BooleanVal::null();
   return BooleanVal(coll.num_tuples != 0);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/is-not-empty-predicate.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/is-not-empty-predicate.h 
b/be/src/exprs/is-not-empty-predicate.h
index 3eed20e..7f2df17 100644
--- a/be/src/exprs/is-not-empty-predicate.h
+++ b/be/src/exprs/is-not-empty-predicate.h
@@ -29,7 +29,7 @@ class IsNotEmptyPredicate: public Predicate {
   virtual Status Prepare(RuntimeState* state, const RowDescriptor& row_desc,
                          ExprContext* ctx);
   virtual Status GetCodegendComputeFn(RuntimeState* state, llvm::Function** 
fn);
-  virtual BooleanVal GetBooleanVal(ExprContext* context, TupleRow* row);
+  virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow* row);
   virtual std::string DebugString() const;
 
  protected:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/literal.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/literal.cc b/be/src/exprs/literal.cc
index 340403e..b4a0fdf 100644
--- a/be/src/exprs/literal.cc
+++ b/be/src/exprs/literal.cc
@@ -260,49 +260,49 @@ Literal* Literal::CreateLiteral(const ColumnType& type, 
const string& str) {
   }
 }
 
-BooleanVal Literal::GetBooleanVal(ExprContext* context, TupleRow* row) {
+BooleanVal Literal::GetBooleanVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_BOOLEAN) << type_;
   return BooleanVal(value_.bool_val);
 }
 
-TinyIntVal Literal::GetTinyIntVal(ExprContext* context, TupleRow* row) {
+TinyIntVal Literal::GetTinyIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_TINYINT) << type_;
   return TinyIntVal(value_.tinyint_val);
 }
 
-SmallIntVal Literal::GetSmallIntVal(ExprContext* context, TupleRow* row) {
+SmallIntVal Literal::GetSmallIntVal(ExprContext* context, const TupleRow* row) 
{
   DCHECK_EQ(type_.type, TYPE_SMALLINT) << type_;
   return SmallIntVal(value_.smallint_val);
 }
 
-IntVal Literal::GetIntVal(ExprContext* context, TupleRow* row) {
+IntVal Literal::GetIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_INT) << type_;
   return IntVal(value_.int_val);
 }
 
-BigIntVal Literal::GetBigIntVal(ExprContext* context, TupleRow* row) {
+BigIntVal Literal::GetBigIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_BIGINT) << type_;
   return BigIntVal(value_.bigint_val);
 }
 
-FloatVal Literal::GetFloatVal(ExprContext* context, TupleRow* row) {
+FloatVal Literal::GetFloatVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_FLOAT) << type_;
   return FloatVal(value_.float_val);
 }
 
-DoubleVal Literal::GetDoubleVal(ExprContext* context, TupleRow* row) {
+DoubleVal Literal::GetDoubleVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_DOUBLE) << type_;
   return DoubleVal(value_.double_val);
 }
 
-StringVal Literal::GetStringVal(ExprContext* context, TupleRow* row) {
+StringVal Literal::GetStringVal(ExprContext* context, const TupleRow* row) {
   DCHECK(type_.IsStringType()) << type_;
   StringVal result;
   value_.string_val.ToStringVal(&result);
   return result;
 }
 
-DecimalVal Literal::GetDecimalVal(ExprContext* context, TupleRow* row) {
+DecimalVal Literal::GetDecimalVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_DECIMAL) << type_;
   switch (type().GetByteSize()) {
     case 4:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/literal.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/literal.h b/be/src/exprs/literal.h
index d3e4277..2400293 100644
--- a/be/src/exprs/literal.h
+++ b/be/src/exprs/literal.h
@@ -44,15 +44,15 @@ class Literal: public Expr {
 
   virtual Status GetCodegendComputeFn(RuntimeState* state, llvm::Function** 
fn);
 
-  virtual impala_udf::BooleanVal GetBooleanVal(ExprContext*, TupleRow*);
-  virtual impala_udf::TinyIntVal GetTinyIntVal(ExprContext*, TupleRow*);
-  virtual impala_udf::SmallIntVal GetSmallIntVal(ExprContext*, TupleRow*);
-  virtual impala_udf::IntVal GetIntVal(ExprContext*, TupleRow*);
-  virtual impala_udf::BigIntVal GetBigIntVal(ExprContext*, TupleRow*);
-  virtual impala_udf::FloatVal GetFloatVal(ExprContext*, TupleRow*);
-  virtual impala_udf::DoubleVal GetDoubleVal(ExprContext*, TupleRow*);
-  virtual impala_udf::StringVal GetStringVal(ExprContext*, TupleRow*);
-  virtual impala_udf::DecimalVal GetDecimalVal(ExprContext*, TupleRow*);
+  virtual impala_udf::BooleanVal GetBooleanVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::TinyIntVal GetTinyIntVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::SmallIntVal GetSmallIntVal(ExprContext*, const 
TupleRow*);
+  virtual impala_udf::IntVal GetIntVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::BigIntVal GetBigIntVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::FloatVal GetFloatVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::DoubleVal GetDoubleVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::StringVal GetStringVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::DecimalVal GetDecimalVal(ExprContext*, const TupleRow*);
 
  protected:
   friend class Expr;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/null-literal.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/null-literal.cc b/be/src/exprs/null-literal.cc
index aa56034..ffb72dd 100644
--- a/be/src/exprs/null-literal.cc
+++ b/be/src/exprs/null-literal.cc
@@ -27,57 +27,57 @@ using namespace llvm;
 
 namespace impala {
 
-BooleanVal NullLiteral::GetBooleanVal(ExprContext* context, TupleRow* row) {
+BooleanVal NullLiteral::GetBooleanVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_BOOLEAN) << type_;
   return BooleanVal::null();
 }
 
-TinyIntVal NullLiteral::GetTinyIntVal(ExprContext* context, TupleRow* row) {
+TinyIntVal NullLiteral::GetTinyIntVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_TINYINT) << type_;
   return TinyIntVal::null();
 }
 
-SmallIntVal NullLiteral::GetSmallIntVal(ExprContext* context, TupleRow* row) {
+SmallIntVal NullLiteral::GetSmallIntVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_SMALLINT) << type_;
   return SmallIntVal::null();
 }
 
-IntVal NullLiteral::GetIntVal(ExprContext* context, TupleRow* row) {
+IntVal NullLiteral::GetIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_INT) << type_;
   return IntVal::null();
 }
 
-BigIntVal NullLiteral::GetBigIntVal(ExprContext* context, TupleRow* row) {
+BigIntVal NullLiteral::GetBigIntVal(ExprContext* context, const TupleRow* row) 
{
   DCHECK_EQ(type_.type, TYPE_BIGINT) << type_;
   return BigIntVal::null();
 }
 
-FloatVal NullLiteral::GetFloatVal(ExprContext* context, TupleRow* row) {
+FloatVal NullLiteral::GetFloatVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_FLOAT) << type_;
   return FloatVal::null();
 }
 
-DoubleVal NullLiteral::GetDoubleVal(ExprContext* context, TupleRow* row) {
+DoubleVal NullLiteral::GetDoubleVal(ExprContext* context, const TupleRow* row) 
{
   DCHECK_EQ(type_.type, TYPE_DOUBLE) << type_;
   return DoubleVal::null();
 }
 
-StringVal NullLiteral::GetStringVal(ExprContext* context, TupleRow* row) {
+StringVal NullLiteral::GetStringVal(ExprContext* context, const TupleRow* row) 
{
   DCHECK(type_.IsStringType()) << type_;
   return StringVal::null();
 }
 
-TimestampVal NullLiteral::GetTimestampVal(ExprContext* context, TupleRow* row) 
{
+TimestampVal NullLiteral::GetTimestampVal(ExprContext* context, const 
TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_TIMESTAMP) << type_;
   return TimestampVal::null();
 }
 
-DecimalVal NullLiteral::GetDecimalVal(ExprContext* context, TupleRow* row) {
+DecimalVal NullLiteral::GetDecimalVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_DECIMAL) << type_;
   return DecimalVal::null();
 }
 
-CollectionVal NullLiteral::GetCollectionVal(ExprContext* context, TupleRow* 
row) {
+CollectionVal NullLiteral::GetCollectionVal(ExprContext* context, const 
TupleRow* row) {
   DCHECK(type_.IsCollectionType());
   return CollectionVal::null();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/null-literal.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/null-literal.h b/be/src/exprs/null-literal.h
index f474fec..4befce6 100644
--- a/be/src/exprs/null-literal.h
+++ b/be/src/exprs/null-literal.h
@@ -27,17 +27,17 @@ class NullLiteral: public Expr {
   NullLiteral(PrimitiveType type) : Expr(type) { }
   virtual Status GetCodegendComputeFn(RuntimeState* state, llvm::Function** 
fn);
 
-  virtual impala_udf::BooleanVal GetBooleanVal(ExprContext*, TupleRow*);
-  virtual impala_udf::TinyIntVal GetTinyIntVal(ExprContext*, TupleRow*);
-  virtual impala_udf::SmallIntVal GetSmallIntVal(ExprContext*, TupleRow*);
-  virtual impala_udf::IntVal GetIntVal(ExprContext*, TupleRow*);
-  virtual impala_udf::BigIntVal GetBigIntVal(ExprContext*, TupleRow*);
-  virtual impala_udf::FloatVal GetFloatVal(ExprContext*, TupleRow*);
-  virtual impala_udf::DoubleVal GetDoubleVal(ExprContext*, TupleRow*);
-  virtual impala_udf::StringVal GetStringVal(ExprContext*, TupleRow*);
-  virtual impala_udf::TimestampVal GetTimestampVal(ExprContext*, TupleRow*);
-  virtual impala_udf::DecimalVal GetDecimalVal(ExprContext*, TupleRow*);
-  virtual impala_udf::CollectionVal GetCollectionVal(ExprContext*, TupleRow*);
+  virtual impala_udf::BooleanVal GetBooleanVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::TinyIntVal GetTinyIntVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::SmallIntVal GetSmallIntVal(ExprContext*, const 
TupleRow*);
+  virtual impala_udf::IntVal GetIntVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::BigIntVal GetBigIntVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::FloatVal GetFloatVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::DoubleVal GetDoubleVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::StringVal GetStringVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::TimestampVal GetTimestampVal(ExprContext*, const 
TupleRow*);
+  virtual impala_udf::DecimalVal GetDecimalVal(ExprContext*, const TupleRow*);
+  virtual impala_udf::CollectionVal GetCollectionVal(ExprContext*, const 
TupleRow*);
 
   virtual std::string DebugString() const;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/scalar-fn-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-fn-call.cc b/be/src/exprs/scalar-fn-call.cc
index 4f91c6c..9a35ba1 100644
--- a/be/src/exprs/scalar-fn-call.cc
+++ b/be/src/exprs/scalar-fn-call.cc
@@ -527,7 +527,7 @@ Status ScalarFnCall::GetFunction(RuntimeState* state, const 
string& symbol, void
   }
 }
 
-void ScalarFnCall::EvaluateChildren(ExprContext* context, TupleRow* row,
+void ScalarFnCall::EvaluateChildren(ExprContext* context, const TupleRow* row,
                                     vector<AnyVal*>* input_vals) {
   DCHECK_EQ(input_vals->size(), NumFixedArgs());
   FunctionContext* fn_ctx = context->fn_context(fn_context_index_);
@@ -546,7 +546,7 @@ void ScalarFnCall::EvaluateChildren(ExprContext* context, 
TupleRow* row,
 }
 
 template<typename RETURN_TYPE>
-RETURN_TYPE ScalarFnCall::InterpretEval(ExprContext* context, TupleRow* row) {
+RETURN_TYPE ScalarFnCall::InterpretEval(ExprContext* context, const TupleRow* 
row) {
   DCHECK(scalar_fn_ != NULL) << DebugString();
   FunctionContext* fn_ctx = context->fn_context(fn_context_index_);
   vector<AnyVal*>* input_vals = fn_ctx->impl()->staging_input_vals();
@@ -675,19 +675,19 @@ RETURN_TYPE ScalarFnCall::InterpretEval(ExprContext* 
context, TupleRow* row) {
   return RETURN_TYPE::null();
 }
 
-typedef BooleanVal (*BooleanWrapper)(ExprContext*, TupleRow*);
-typedef TinyIntVal (*TinyIntWrapper)(ExprContext*, TupleRow*);
-typedef SmallIntVal (*SmallIntWrapper)(ExprContext*, TupleRow*);
-typedef IntVal (*IntWrapper)(ExprContext*, TupleRow*);
-typedef BigIntVal (*BigIntWrapper)(ExprContext*, TupleRow*);
-typedef FloatVal (*FloatWrapper)(ExprContext*, TupleRow*);
-typedef DoubleVal (*DoubleWrapper)(ExprContext*, TupleRow*);
-typedef StringVal (*StringWrapper)(ExprContext*, TupleRow*);
-typedef TimestampVal (*TimestampWrapper)(ExprContext*, TupleRow*);
-typedef DecimalVal (*DecimalWrapper)(ExprContext*, TupleRow*);
+typedef BooleanVal (*BooleanWrapper)(ExprContext*, const TupleRow*);
+typedef TinyIntVal (*TinyIntWrapper)(ExprContext*, const TupleRow*);
+typedef SmallIntVal (*SmallIntWrapper)(ExprContext*, const TupleRow*);
+typedef IntVal (*IntWrapper)(ExprContext*, const TupleRow*);
+typedef BigIntVal (*BigIntWrapper)(ExprContext*, const TupleRow*);
+typedef FloatVal (*FloatWrapper)(ExprContext*, const TupleRow*);
+typedef DoubleVal (*DoubleWrapper)(ExprContext*, const TupleRow*);
+typedef StringVal (*StringWrapper)(ExprContext*, const TupleRow*);
+typedef TimestampVal (*TimestampWrapper)(ExprContext*, const TupleRow*);
+typedef DecimalVal (*DecimalWrapper)(ExprContext*, const TupleRow*);
 
 // TODO: macroify this?
-BooleanVal ScalarFnCall::GetBooleanVal(ExprContext* context, TupleRow* row) {
+BooleanVal ScalarFnCall::GetBooleanVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_BOOLEAN);
   DCHECK(context != NULL);
   if (scalar_fn_wrapper_ == NULL) return InterpretEval<BooleanVal>(context, 
row);
@@ -695,7 +695,7 @@ BooleanVal ScalarFnCall::GetBooleanVal(ExprContext* 
context, TupleRow* row) {
   return fn(context, row);
 }
 
-TinyIntVal ScalarFnCall::GetTinyIntVal(ExprContext* context, TupleRow* row) {
+TinyIntVal ScalarFnCall::GetTinyIntVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_TINYINT);
   DCHECK(context != NULL);
   if (scalar_fn_wrapper_ == NULL) return InterpretEval<TinyIntVal>(context, 
row);
@@ -703,7 +703,7 @@ TinyIntVal ScalarFnCall::GetTinyIntVal(ExprContext* 
context, TupleRow* row) {
   return fn(context, row);
 }
 
-SmallIntVal ScalarFnCall::GetSmallIntVal(ExprContext* context, TupleRow* row) {
+SmallIntVal ScalarFnCall::GetSmallIntVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_SMALLINT);
   DCHECK(context != NULL);
   if (scalar_fn_wrapper_ == NULL) return InterpretEval<SmallIntVal>(context, 
row);
@@ -711,7 +711,7 @@ SmallIntVal ScalarFnCall::GetSmallIntVal(ExprContext* 
context, TupleRow* row) {
   return fn(context, row);
 }
 
-IntVal ScalarFnCall::GetIntVal(ExprContext* context, TupleRow* row) {
+IntVal ScalarFnCall::GetIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_INT);
   DCHECK(context != NULL);
   if (scalar_fn_wrapper_ == NULL) return InterpretEval<IntVal>(context, row);
@@ -719,7 +719,7 @@ IntVal ScalarFnCall::GetIntVal(ExprContext* context, 
TupleRow* row) {
   return fn(context, row);
 }
 
-BigIntVal ScalarFnCall::GetBigIntVal(ExprContext* context, TupleRow* row) {
+BigIntVal ScalarFnCall::GetBigIntVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_BIGINT);
   DCHECK(context != NULL);
   if (scalar_fn_wrapper_ == NULL) return InterpretEval<BigIntVal>(context, 
row);
@@ -727,7 +727,7 @@ BigIntVal ScalarFnCall::GetBigIntVal(ExprContext* context, 
TupleRow* row) {
   return fn(context, row);
 }
 
-FloatVal ScalarFnCall::GetFloatVal(ExprContext* context, TupleRow* row) {
+FloatVal ScalarFnCall::GetFloatVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_FLOAT);
   DCHECK(context != NULL);
   if (scalar_fn_wrapper_ == NULL) return InterpretEval<FloatVal>(context, row);
@@ -735,7 +735,7 @@ FloatVal ScalarFnCall::GetFloatVal(ExprContext* context, 
TupleRow* row) {
   return fn(context, row);
 }
 
-DoubleVal ScalarFnCall::GetDoubleVal(ExprContext* context, TupleRow* row) {
+DoubleVal ScalarFnCall::GetDoubleVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_DOUBLE);
   DCHECK(context != NULL);
   if (scalar_fn_wrapper_ == NULL) return InterpretEval<DoubleVal>(context, 
row);
@@ -743,7 +743,7 @@ DoubleVal ScalarFnCall::GetDoubleVal(ExprContext* context, 
TupleRow* row) {
   return fn(context, row);
 }
 
-StringVal ScalarFnCall::GetStringVal(ExprContext* context, TupleRow* row) {
+StringVal ScalarFnCall::GetStringVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK(type_.IsStringType());
   DCHECK(context != NULL);
   if (scalar_fn_wrapper_ == NULL) return InterpretEval<StringVal>(context, 
row);
@@ -751,7 +751,7 @@ StringVal ScalarFnCall::GetStringVal(ExprContext* context, 
TupleRow* row) {
   return fn(context, row);
 }
 
-TimestampVal ScalarFnCall::GetTimestampVal(ExprContext* context, TupleRow* 
row) {
+TimestampVal ScalarFnCall::GetTimestampVal(ExprContext* context, const 
TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_TIMESTAMP);
   DCHECK(context != NULL);
   if (scalar_fn_wrapper_ == NULL) return InterpretEval<TimestampVal>(context, 
row);
@@ -759,7 +759,7 @@ TimestampVal ScalarFnCall::GetTimestampVal(ExprContext* 
context, TupleRow* row)
   return fn(context, row);
 }
 
-DecimalVal ScalarFnCall::GetDecimalVal(ExprContext* context, TupleRow* row) {
+DecimalVal ScalarFnCall::GetDecimalVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_DECIMAL);
   DCHECK(context != NULL);
   if (scalar_fn_wrapper_ == NULL) return InterpretEval<DecimalVal>(context, 
row);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/scalar-fn-call.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-fn-call.h b/be/src/exprs/scalar-fn-call.h
index 01ff1e2..7dca86e 100644
--- a/be/src/exprs/scalar-fn-call.h
+++ b/be/src/exprs/scalar-fn-call.h
@@ -63,16 +63,16 @@ class ScalarFnCall: public Expr {
 
   virtual bool IsConstant() const;
 
-  virtual BooleanVal GetBooleanVal(ExprContext* context, TupleRow*);
-  virtual TinyIntVal GetTinyIntVal(ExprContext* context, TupleRow*);
-  virtual SmallIntVal GetSmallIntVal(ExprContext* context, TupleRow*);
-  virtual IntVal GetIntVal(ExprContext* context, TupleRow*);
-  virtual BigIntVal GetBigIntVal(ExprContext* context, TupleRow*);
-  virtual FloatVal GetFloatVal(ExprContext* context, TupleRow*);
-  virtual DoubleVal GetDoubleVal(ExprContext* context, TupleRow*);
-  virtual StringVal GetStringVal(ExprContext* context, TupleRow*);
-  virtual TimestampVal GetTimestampVal(ExprContext* context, TupleRow*);
-  virtual DecimalVal GetDecimalVal(ExprContext* context, TupleRow*);
+  virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow*);
+  virtual TinyIntVal GetTinyIntVal(ExprContext* context, const TupleRow*);
+  virtual SmallIntVal GetSmallIntVal(ExprContext* context, const TupleRow*);
+  virtual IntVal GetIntVal(ExprContext* context, const TupleRow*);
+  virtual BigIntVal GetBigIntVal(ExprContext* context, const TupleRow*);
+  virtual FloatVal GetFloatVal(ExprContext* context, const TupleRow*);
+  virtual DoubleVal GetDoubleVal(ExprContext* context, const TupleRow*);
+  virtual StringVal GetStringVal(ExprContext* context, const TupleRow*);
+  virtual TimestampVal GetTimestampVal(ExprContext* context, const TupleRow*);
+  virtual DecimalVal GetDecimalVal(ExprContext* context, const TupleRow*);
 
  private:
   /// If this function has var args, children()[vararg_start_idx_] is the 
first vararg
@@ -81,7 +81,7 @@ class ScalarFnCall: public Expr {
   int vararg_start_idx_;
 
   /// Function pointer to the JIT'd function produced by 
GetCodegendComputeFn().
-  /// Has signature *Val (ExprContext*, TupleRow*), and calls the scalar
+  /// Has signature *Val (ExprContext*, const TupleRow*), and calls the scalar
   /// function with signature like *Val (FunctionContext*, const *Val& arg1, 
...)
   void* scalar_fn_wrapper_;
 
@@ -113,12 +113,12 @@ class ScalarFnCall: public Expr {
 
   /// Evaluates the children exprs and stores the results in input_vals. Used 
in the
   /// interpreted path.
-  void EvaluateChildren(ExprContext* context, TupleRow* row,
+  void EvaluateChildren(ExprContext* context, const TupleRow* row,
                         std::vector<impala_udf::AnyVal*>* input_vals);
 
   /// Function to call scalar_fn_. Used in the interpreted path.
   template<typename RETURN_TYPE>
-  RETURN_TYPE InterpretEval(ExprContext* context, TupleRow* row);
+  RETURN_TYPE InterpretEval(ExprContext* context, const TupleRow* row);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/slot-ref.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/slot-ref.cc b/be/src/exprs/slot-ref.cc
index 59c2b78..e5dbc34 100644
--- a/be/src/exprs/slot-ref.cc
+++ b/be/src/exprs/slot-ref.cc
@@ -368,56 +368,56 @@ Status SlotRef::GetCodegendComputeFn(RuntimeState* state, 
llvm::Function** fn) {
   return Status::OK();
 }
 
-BooleanVal SlotRef::GetBooleanVal(ExprContext* context, TupleRow* row) {
+BooleanVal SlotRef::GetBooleanVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_BOOLEAN);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
BooleanVal::null();
   return BooleanVal(*reinterpret_cast<bool*>(t->GetSlot(slot_offset_)));
 }
 
-TinyIntVal SlotRef::GetTinyIntVal(ExprContext* context, TupleRow* row) {
+TinyIntVal SlotRef::GetTinyIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_TINYINT);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
TinyIntVal::null();
   return TinyIntVal(*reinterpret_cast<int8_t*>(t->GetSlot(slot_offset_)));
 }
 
-SmallIntVal SlotRef::GetSmallIntVal(ExprContext* context, TupleRow* row) {
+SmallIntVal SlotRef::GetSmallIntVal(ExprContext* context, const TupleRow* row) 
{
   DCHECK_EQ(type_.type, TYPE_SMALLINT);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
SmallIntVal::null();
   return SmallIntVal(*reinterpret_cast<int16_t*>(t->GetSlot(slot_offset_)));
 }
 
-IntVal SlotRef::GetIntVal(ExprContext* context, TupleRow* row) {
+IntVal SlotRef::GetIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_INT);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return IntVal::null();
   return IntVal(*reinterpret_cast<int32_t*>(t->GetSlot(slot_offset_)));
 }
 
-BigIntVal SlotRef::GetBigIntVal(ExprContext* context, TupleRow* row) {
+BigIntVal SlotRef::GetBigIntVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_BIGINT);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return BigIntVal::null();
   return BigIntVal(*reinterpret_cast<int64_t*>(t->GetSlot(slot_offset_)));
 }
 
-FloatVal SlotRef::GetFloatVal(ExprContext* context, TupleRow* row) {
+FloatVal SlotRef::GetFloatVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_FLOAT);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return FloatVal::null();
   return FloatVal(*reinterpret_cast<float*>(t->GetSlot(slot_offset_)));
 }
 
-DoubleVal SlotRef::GetDoubleVal(ExprContext* context, TupleRow* row) {
+DoubleVal SlotRef::GetDoubleVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_DOUBLE);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return DoubleVal::null();
   return DoubleVal(*reinterpret_cast<double*>(t->GetSlot(slot_offset_)));
 }
 
-StringVal SlotRef::GetStringVal(ExprContext* context, TupleRow* row) {
+StringVal SlotRef::GetStringVal(ExprContext* context, const TupleRow* row) {
   DCHECK(type_.IsStringType());
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return StringVal::null();
@@ -433,7 +433,7 @@ StringVal SlotRef::GetStringVal(ExprContext* context, 
TupleRow* row) {
   return result;
 }
 
-TimestampVal SlotRef::GetTimestampVal(ExprContext* context, TupleRow* row) {
+TimestampVal SlotRef::GetTimestampVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK_EQ(type_.type, TYPE_TIMESTAMP);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
TimestampVal::null();
@@ -443,7 +443,7 @@ TimestampVal SlotRef::GetTimestampVal(ExprContext* context, 
TupleRow* row) {
   return result;
 }
 
-DecimalVal SlotRef::GetDecimalVal(ExprContext* context, TupleRow* row) {
+DecimalVal SlotRef::GetDecimalVal(ExprContext* context, const TupleRow* row) {
   DCHECK_EQ(type_.type, TYPE_DECIMAL);
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
DecimalVal::null();
@@ -460,7 +460,7 @@ DecimalVal SlotRef::GetDecimalVal(ExprContext* context, 
TupleRow* row) {
   }
 }
 
-CollectionVal SlotRef::GetCollectionVal(ExprContext* context, TupleRow* row) {
+CollectionVal SlotRef::GetCollectionVal(ExprContext* context, const TupleRow* 
row) {
   DCHECK(type_.IsCollectionType());
   Tuple* t = row->GetTuple(tuple_idx_);
   if (t == NULL || t->IsNull(null_indicator_offset_)) return 
CollectionVal::null();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/slot-ref.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/slot-ref.h b/be/src/exprs/slot-ref.h
index a00adbf..afe2191 100644
--- a/be/src/exprs/slot-ref.h
+++ b/be/src/exprs/slot-ref.h
@@ -42,17 +42,17 @@ class SlotRef : public Expr {
 
   virtual Status GetCodegendComputeFn(RuntimeState* state, llvm::Function** 
fn);
 
-  virtual impala_udf::BooleanVal GetBooleanVal(ExprContext* context, 
TupleRow*);
-  virtual impala_udf::TinyIntVal GetTinyIntVal(ExprContext* context, 
TupleRow*);
-  virtual impala_udf::SmallIntVal GetSmallIntVal(ExprContext* context, 
TupleRow*);
-  virtual impala_udf::IntVal GetIntVal(ExprContext* context, TupleRow*);
-  virtual impala_udf::BigIntVal GetBigIntVal(ExprContext* context, TupleRow*);
-  virtual impala_udf::FloatVal GetFloatVal(ExprContext* context, TupleRow*);
-  virtual impala_udf::DoubleVal GetDoubleVal(ExprContext* context, TupleRow*);
-  virtual impala_udf::StringVal GetStringVal(ExprContext* context, TupleRow*);
-  virtual impala_udf::TimestampVal GetTimestampVal(ExprContext* context, 
TupleRow*);
-  virtual impala_udf::DecimalVal GetDecimalVal(ExprContext* context, 
TupleRow*);
-  virtual impala_udf::CollectionVal GetCollectionVal(ExprContext* context, 
TupleRow*);
+  virtual impala_udf::BooleanVal GetBooleanVal(ExprContext* context, const 
TupleRow*);
+  virtual impala_udf::TinyIntVal GetTinyIntVal(ExprContext* context, const 
TupleRow*);
+  virtual impala_udf::SmallIntVal GetSmallIntVal(ExprContext* context, const 
TupleRow*);
+  virtual impala_udf::IntVal GetIntVal(ExprContext* context, const TupleRow*);
+  virtual impala_udf::BigIntVal GetBigIntVal(ExprContext* context, const 
TupleRow*);
+  virtual impala_udf::FloatVal GetFloatVal(ExprContext* context, const 
TupleRow*);
+  virtual impala_udf::DoubleVal GetDoubleVal(ExprContext* context, const 
TupleRow*);
+  virtual impala_udf::StringVal GetStringVal(ExprContext* context, const 
TupleRow*);
+  virtual impala_udf::TimestampVal GetTimestampVal(ExprContext* context, const 
TupleRow*);
+  virtual impala_udf::DecimalVal GetDecimalVal(ExprContext* context, const 
TupleRow*);
+  virtual impala_udf::CollectionVal GetCollectionVal(ExprContext* context, 
const TupleRow*);
 
  protected:
   int tuple_idx_;  // within row

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/tuple-is-null-predicate.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/tuple-is-null-predicate.cc 
b/be/src/exprs/tuple-is-null-predicate.cc
index ba96138..622caff 100644
--- a/be/src/exprs/tuple-is-null-predicate.cc
+++ b/be/src/exprs/tuple-is-null-predicate.cc
@@ -24,7 +24,7 @@
 
 namespace impala {
 
-BooleanVal TupleIsNullPredicate::GetBooleanVal(ExprContext* ctx, TupleRow* 
row) {
+BooleanVal TupleIsNullPredicate::GetBooleanVal(ExprContext* ctx, const 
TupleRow* row) {
   int count = 0;
   for (int i = 0; i < tuple_idxs_.size(); ++i) {
     count += row->GetTuple(tuple_idxs_[i]) == NULL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/exprs/tuple-is-null-predicate.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/tuple-is-null-predicate.h 
b/be/src/exprs/tuple-is-null-predicate.h
index c5bd2b5..62c021e 100644
--- a/be/src/exprs/tuple-is-null-predicate.h
+++ b/be/src/exprs/tuple-is-null-predicate.h
@@ -40,7 +40,7 @@ class TupleIsNullPredicate: public Predicate {
 
   virtual bool IsConstant() const { return false; }
 
-  virtual BooleanVal GetBooleanVal(ExprContext* context, TupleRow* row);
+  virtual BooleanVal GetBooleanVal(ExprContext* context, const TupleRow* row);
 
  private:
   /// Tuple ids to check for NULL. May contain ids of nullable and 
non-nullable tuples.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc 
b/be/src/runtime/data-stream-recvr.cc
index 10b0803..83decea 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -258,7 +258,7 @@ void DataStreamRecvr::SenderQueue::Close() {
 
 Status DataStreamRecvr::CreateMerger(const TupleRowComparator& less_than) {
   DCHECK(is_merging_);
-  vector<SortedRunMerger::RunBatchSupplier> input_batch_suppliers;
+  vector<SortedRunMerger::RunBatchSupplierFn> input_batch_suppliers;
   input_batch_suppliers.reserve(sender_queues_.size());
 
   // Create the merger that will a single stream of sorted rows.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 4197977..294c080 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -198,6 +198,7 @@ class RowBatch {
   int row_byte_size() { return num_tuples_per_row_ * sizeof(Tuple*); }
   MemPool* tuple_data_pool() { return &tuple_data_pool_; }
   int num_io_buffers() const { return io_buffers_.size(); }
+  int num_blocks() const { return blocks_.size(); }
   int num_tuple_streams() const { return tuple_streams_.size(); }
 
   /// Resets the row batch, returning all resources it has accumulated.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/37ec2539/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc 
b/be/src/runtime/sorted-run-merger.cc
index 328d1c2..dd0ea30 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -1,4 +1,3 @@
-// Copyright 2012 Cloudera Inc.
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
@@ -24,16 +23,16 @@
 
 namespace impala {
 
-/// BatchedRowSupplier returns individual rows in a batch obtained from a 
sorted input
-/// run (a RunBatchSupplier). Used as the heap element in the min heap 
maintained by the
+/// SortedRunWrapper returns individual rows in a batch obtained from a sorted 
input run
+/// (a RunBatchSupplierFn). Used as the heap element in the min heap 
maintained by the
 /// merger.
-/// Next() advances the row supplier to the next row in the input batch and 
retrieves
+/// Advance() advances the row supplier to the next row in the input batch and 
retrieves
 /// the next batch from the input if the current input batch is exhausted. 
Transfers
 /// ownership from the current input batch to an output batch if requested.
-class SortedRunMerger::BatchedRowSupplier {
+class SortedRunMerger::SortedRunWrapper {
  public:
   /// Construct an instance from a sorted input run.
-  BatchedRowSupplier(SortedRunMerger* parent, const RunBatchSupplier& 
sorted_run)
+  SortedRunWrapper(SortedRunMerger* parent, const RunBatchSupplierFn& 
sorted_run)
     : sorted_run_(sorted_run),
       input_row_batch_(NULL),
       input_row_batch_index_(-1),
@@ -41,35 +40,44 @@ class SortedRunMerger::BatchedRowSupplier {
   }
 
   /// Retrieves the first batch of sorted rows from the run.
-  Status Init(bool* done) {
-    *done = false;
+  Status Init(bool* eos) {
+    *eos = false;
     RETURN_IF_ERROR(sorted_run_(&input_row_batch_));
     if (input_row_batch_ == NULL) {
-      *done = true;
+      *eos = true;
       return Status::OK();
     }
-    RETURN_IF_ERROR(Next(NULL, done));
+    RETURN_IF_ERROR(Advance(NULL, eos));
     return Status::OK();
   }
 
-  /// Increment the current row index. If the current input batch is exhausted 
fetch the
+  /// Increment the current row index. If the current input batch is 
exhausted, fetch the
   /// next one from the sorted run. Transfer ownership to transfer_batch if 
not NULL.
-  Status Next(RowBatch* transfer_batch, bool* done) {
+  Status Advance(RowBatch* transfer_batch, bool* eos) {
     DCHECK(input_row_batch_ != NULL);
     ++input_row_batch_index_;
     if (input_row_batch_index_ < input_row_batch_->num_rows()) {
-      *done = false;
-    } else {
-      ScopedTimer<MonotonicStopWatch> timer(parent_->get_next_batch_timer_);
+      *eos = false;
+      return Status::OK();
+    }
+
+    // Iterate until we hit eos or get a non-empty batch.
+    do {
+      // Make sure to transfer resources from every batch received from 
'sorted_run_'.
       if (transfer_batch != NULL) {
+        DCHECK(!input_row_batch_->need_to_return()) << "Run batch suppliers 
that set the "
+          "need_to_return flag must use a deep-copying merger";
         input_row_batch_->TransferResourceOwnership(transfer_batch);
       }
 
-      RETURN_IF_ERROR(sorted_run_(&input_row_batch_));
-      DCHECK(input_row_batch_ == NULL || input_row_batch_->num_rows() > 0);
-      *done = input_row_batch_ == NULL;
-      input_row_batch_index_ = 0;
-    }
+      {
+        ScopedTimer<MonotonicStopWatch> timer(parent_->get_next_batch_timer_);
+        RETURN_IF_ERROR(sorted_run_(&input_row_batch_));
+      }
+    } while (input_row_batch_ != NULL && input_row_batch_->num_rows() == 0);
+
+    *eos = input_row_batch_ == NULL;
+    input_row_batch_index_ = 0;
     return Status::OK();
   }
 
@@ -81,7 +89,7 @@ class SortedRunMerger::BatchedRowSupplier {
   friend class SortedRunMerger;
 
   /// The run from which this object supplies rows.
-  RunBatchSupplier sorted_run_;
+  RunBatchSupplierFn sorted_run_;
 
   /// The current input batch being processed.
   RowBatch* input_row_batch_;
@@ -125,11 +133,11 @@ SortedRunMerger::SortedRunMerger(const 
TupleRowComparator& comparator,
   get_next_batch_timer_ = ADD_TIMER(profile, "MergeGetNextBatch");
 }
 
-Status SortedRunMerger::Prepare(const vector<RunBatchSupplier>& input_runs) {
+Status SortedRunMerger::Prepare(const vector<RunBatchSupplierFn>& input_runs) {
   DCHECK_EQ(min_heap_.size(), 0);
   min_heap_.reserve(input_runs.size());
-  for (const RunBatchSupplier& input_run: input_runs) {
-    BatchedRowSupplier* new_elem = pool_.Add(new BatchedRowSupplier(this, 
input_run));
+  for (const RunBatchSupplierFn& input_run: input_runs) {
+    SortedRunWrapper* new_elem = pool_.Add(new SortedRunWrapper(this, 
input_run));
     DCHECK(new_elem != NULL);
     bool empty;
     RETURN_IF_ERROR(new_elem->Init(&empty));
@@ -146,13 +154,9 @@ Status SortedRunMerger::Prepare(const 
vector<RunBatchSupplier>& input_runs) {
 
 Status SortedRunMerger::GetNext(RowBatch* output_batch, bool* eos) {
   ScopedTimer<MonotonicStopWatch> timer(get_next_timer_);
-  if (min_heap_.empty()) {
-    *eos = true;
-    return Status::OK();
-  }
 
-  while (!output_batch->AtCapacity()) {
-    BatchedRowSupplier* min = min_heap_[0];
+  while (!output_batch->AtCapacity() && !min_heap_.empty()) {
+    SortedRunWrapper* min = min_heap_[0];
     int output_row_index = output_batch->AddRow();
     TupleRow* output_row = output_batch->GetRow(output_row_index);
     if (deep_copy_input_) {
@@ -165,20 +169,7 @@ Status SortedRunMerger::GetNext(RowBatch* output_batch, 
bool* eos) {
     }
 
     output_batch->CommitLastRow();
-
-    bool min_run_complete = false;
-    // Advance to the next element in min. output_batch is supplied to transfer
-    // resource ownership if the input batch in min is exhausted.
-    RETURN_IF_ERROR(min->Next(deep_copy_input_ ? NULL : output_batch,
-        &min_run_complete));
-    if (min_run_complete) {
-      // Remove the element from the heap.
-      iter_swap(min_heap_.begin(), min_heap_.end() - 1);
-      min_heap_.pop_back();
-      if (min_heap_.empty()) break;
-    }
-
-    Heapify(0);
+    RETURN_IF_ERROR(AdvanceMinRow(output_batch));
   }
   // Free local allocations made by comparator_.Less();
   comparator_.FreeLocalAllocations();
@@ -187,4 +178,20 @@ Status SortedRunMerger::GetNext(RowBatch* output_batch, 
bool* eos) {
   return Status::OK();
 }
 
+Status SortedRunMerger::AdvanceMinRow(RowBatch* transfer_batch) {
+  SortedRunWrapper* min = min_heap_[0];
+  bool min_run_complete;
+  // Advance to the next element in min. output_batch is supplied to transfer
+  // resource ownership if the input batch in min is exhausted.
+  RETURN_IF_ERROR(min->Advance(deep_copy_input_ ? NULL : transfer_batch,
+      &min_run_complete));
+  if (min_run_complete) {
+    // Remove the element from the heap.
+    iter_swap(min_heap_.begin(), min_heap_.end() - 1);
+    min_heap_.pop_back();
+  }
+  if (!min_heap_.empty()) Heapify(0);
+  return Status::OK();
+}
+
 }

Reply via email to