Repository: incubator-impala
Updated Branches:
  refs/heads/master 48085274f -> ff6b450ad


IMPALA-4120: Incorrect results with LEAD() analytic function

This change fixes a memory management problem with LEAD()/LAG()
analytic functions which led to incorrect result. In particular,
the update functions specified for these analytic functions only
make a shallow copy of StringVal (i.e. copying only the pointer
and the length of the string) without copying the string itself.
This may lead to problem if the string is created from some UDFs
which do local allocations whose buffer may be freed and reused
before the result tuple is copied out. This change fixes the problem
above by allocating a buffer at the Init() functions of these
analytic functions to track the intermediate value. In addition,
when the value is copied out in GetValue(), it will be copied into
the MemPool belonging to the AnalyticEvalNode and attached to the
outgoing row batches. This change also fixes a missing free of
local allocations in QueryMaintenance().

Change-Id: I85bb1745232d8dd383a6047c86019c6378ab571f
Reviewed-on: http://gerrit.cloudera.org:8080/4740
Reviewed-by: Michael Ho <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/51268c05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/51268c05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/51268c05

Branch: refs/heads/master
Commit: 51268c053ffe41dc1aa9f1b250878113d4225258
Parents: 4808527
Author: Michael Ho <[email protected]>
Authored: Mon Sep 26 16:04:27 2016 -0700
Committer: Internal Jenkins <[email protected]>
Committed: Sat Oct 22 07:39:37 2016 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc               | 88 +++++++++++++-------
 be/src/exec/analytic-eval-node.h                | 28 ++++---
 be/src/exprs/agg-fn-evaluator.h                 | 27 +++---
 be/src/exprs/aggregate-functions-ir.cc          | 11 ++-
 be/src/udf/udf.cc                               |  3 +-
 .../org/apache/impala/catalog/BuiltinsDb.java   |  4 +-
 .../queries/QueryTest/analytic-fns.test         | 14 ++++
 7 files changed, 120 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc 
b/be/src/exec/analytic-eval-node.cc
index 21a0805..06b0467 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -359,7 +359,7 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, 
TupleRow* row) {
     }
   }
 
-  Status status = Status::OK();
+  Status status;
   // Buffer the entire input row to be returned later with the analytic eval 
results.
   if (UNLIKELY(!input_stream_->AddRow(row, &status))) {
     // AddRow returns false if an error occurs (available via status()) or 
there is
@@ -377,45 +377,62 @@ inline Status AnalyticEvalNode::AddRow(int64_t 
stream_idx, TupleRow* row) {
     }
   }
   DCHECK(status.ok());
-  return status;
+  return Status::OK();
 }
 
-void AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
+Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
   VLOG_ROW << id() << " AddResultTuple idx=" << stream_idx;
   DCHECK(curr_tuple_ != NULL);
-  Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(),
-      curr_tuple_pool_.get());
+  MemPool* cur_tuple_pool = curr_tuple_pool_.get();
+  Tuple* result_tuple = Tuple::Create(result_tuple_desc_->byte_size(), 
cur_tuple_pool);
 
   AggFnEvaluator::GetValue(evaluators_, fn_ctxs_, curr_tuple_, result_tuple);
+  // Copy any string data in 'result_tuple' into 'cur_tuple_pool_'.
+  for (const SlotDescriptor* slot_desc : result_tuple_desc_->slots()) {
+    if (!slot_desc->type().IsVarLenStringType()) continue;
+    StringValue* sv = reinterpret_cast<StringValue*>(
+        result_tuple->GetSlot(slot_desc->tuple_offset()));
+    if (sv == NULL || sv->len == 0) continue;
+    char* new_ptr = 
reinterpret_cast<char*>(cur_tuple_pool->TryAllocate(sv->len));
+    if (UNLIKELY(new_ptr == NULL)) {
+      return cur_tuple_pool->mem_tracker()->MemLimitExceeded(NULL,
+          "Failed to allocate memory for analytic function's result.", 
sv->len);
+    }
+    memcpy(new_ptr, sv->ptr, sv->len);
+    sv->ptr = new_ptr;
+  }
+
   DCHECK_GT(stream_idx, last_result_idx_);
   result_tuples_.push_back(pair<int64_t, Tuple*>(stream_idx, result_tuple));
   last_result_idx_ = stream_idx;
   VLOG_ROW << id() << " Added result tuple, final state: " << 
DebugStateString(true);
+  return Status::OK();
 }
 
-inline void AnalyticEvalNode::TryAddResultTupleForPrevRow(bool next_partition,
+inline Status AnalyticEvalNode::TryAddResultTupleForPrevRow(bool 
next_partition,
     int64_t stream_idx, TupleRow* row) {
   // The analytic fns are finalized after the previous row if we found a new 
partition
   // or the window is a RANGE and the order by exprs changed. For ROWS windows 
we do not
   // need to compare the current row to the previous row.
   VLOG_ROW << id() << " TryAddResultTupleForPrevRow partition=" << 
next_partition
            << " idx=" << stream_idx;
-  if (fn_scope_ == ROWS) return;
-  if (next_partition || (fn_scope_ == RANGE && window_.__isset.window_end &&
-      !PrevRowCompare(order_by_eq_expr_ctx_))) {
-    AddResultTuple(stream_idx - 1);
+  if (fn_scope_ != ROWS && (next_partition || (fn_scope_ == RANGE &&
+      window_.__isset.window_end && !PrevRowCompare(order_by_eq_expr_ctx_)))) {
+    RETURN_IF_ERROR(AddResultTuple(stream_idx - 1));
   }
+  return Status::OK();
 }
 
-inline void AnalyticEvalNode::TryAddResultTupleForCurrRow(int64_t stream_idx,
+inline Status AnalyticEvalNode::TryAddResultTupleForCurrRow(int64_t stream_idx,
     TupleRow* row) {
   VLOG_ROW << id() << " TryAddResultTupleForCurrRow idx=" << stream_idx;
   // We only add results at this point for ROWS windows (unless unbounded 
following)
-  if (fn_scope_ != ROWS || !window_.__isset.window_end) return;
-
   // Nothing to add if the end offset is before the start of the partition.
-  if (stream_idx - rows_end_offset_ < curr_partition_idx_) return;
-  AddResultTuple(stream_idx - rows_end_offset_);
+  if (fn_scope_ == ROWS && window_.__isset.window_end &&
+      stream_idx - rows_end_offset_ >= curr_partition_idx_) {
+    RETURN_IF_ERROR(AddResultTuple(stream_idx - rows_end_offset_));
+  }
+  return Status::OK();
 }
 
 inline void AnalyticEvalNode::TryRemoveRowsBeforeWindow(int64_t stream_idx) {
@@ -435,21 +452,27 @@ inline void 
AnalyticEvalNode::TryRemoveRowsBeforeWindow(int64_t stream_idx) {
   window_tuples_.pop_front();
 }
 
-inline void AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
+inline Status AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
     int64_t prev_partition_idx) {
   DCHECK_LT(prev_partition_idx, partition_idx);
   // For PARTITION, RANGE, or ROWS with UNBOUNDED PRECEDING: add a result 
tuple for the
   // remaining rows in the partition that do not have an associated result 
tuple yet.
   if (fn_scope_ != ROWS || !window_.__isset.window_end) {
-    if (last_result_idx_ < partition_idx - 1) AddResultTuple(partition_idx - 
1);
-    return;
+    if (last_result_idx_ < partition_idx - 1) {
+      RETURN_IF_ERROR(AddResultTuple(partition_idx - 1));
+    }
+    return Status::OK();
   }
 
   // lead() is re-written to a ROWS window with an end bound FOLLOWING. Any 
remaining
   // results need the default value (set by Init()). If this is the case, the 
start bound
   // is UNBOUNDED PRECEDING (DCHECK in Init()).
   for (int i = 0; i < evaluators_.size(); ++i) {
-    if (is_lead_fn_[i]) evaluators_[i]->Init(fn_ctxs_[i], curr_tuple_);
+    if (is_lead_fn_[i]) {
+      // Needs to call Finalize() to release resources.
+      evaluators_[i]->Finalize(fn_ctxs_[i], curr_tuple_, dummy_result_tuple_);
+      evaluators_[i]->Init(fn_ctxs_[i], curr_tuple_);
+    }
   }
 
   // If the start bound is not UNBOUNDED PRECEDING and there are still rows in 
the
@@ -470,13 +493,16 @@ inline void 
AnalyticEvalNode::TryAddRemainingResults(int64_t partition_idx,
       AggFnEvaluator::Remove(evaluators_, fn_ctxs_, remove_row, curr_tuple_);
       window_tuples_.pop_front();
     }
-    AddResultTuple(last_result_idx_ + 1);
+    RETURN_IF_ERROR(AddResultTuple(last_result_idx_ + 1));
   }
 
   // If there are still rows between the row with the last result 
(AddResultTuple() may
   // have updated last_result_idx_) and the partition boundary, add the 
current results
   // for the remaining rows with the same result tuple (curr_tuple_ is not 
modified).
-  if (last_result_idx_ < partition_idx - 1) AddResultTuple(partition_idx - 1);
+  if (last_result_idx_ < partition_idx - 1) {
+    RETURN_IF_ERROR(AddResultTuple(partition_idx - 1));
+  }
+  return Status::OK();
 }
 
 inline Status AnalyticEvalNode::InitNextPartition(RuntimeState* state,
@@ -523,7 +549,7 @@ inline Status 
AnalyticEvalNode::InitNextPartition(RuntimeState* state,
 
   if (fn_scope_ == ROWS && stream_idx > 0 && (!window_.__isset.window_end ||
         window_.window_end.type == TAnalyticWindowBoundaryType::FOLLOWING)) {
-    TryAddRemainingResults(stream_idx, prev_partition_stream_idx);
+    RETURN_IF_ERROR(TryAddRemainingResults(stream_idx, 
prev_partition_stream_idx));
   }
   window_tuples_.clear();
 
@@ -557,10 +583,10 @@ inline Status 
AnalyticEvalNode::InitNextPartition(RuntimeState* state,
       // 2) Insert the initial result tuple at first_val_null_offset_. This 
happens when
       //    the end bound was actually Y PRECEDING.
       if (first_val_null_offset_ != -1) {
-        AddResultTuple(curr_partition_idx_ + first_val_null_offset_ - 1);
+        RETURN_IF_ERROR(AddResultTuple(curr_partition_idx_ + 
first_val_null_offset_ - 1));
       }
     } else {
-      AddResultTuple(curr_partition_idx_ - rows_end_offset_ - 1);
+      RETURN_IF_ERROR(AddResultTuple(curr_partition_idx_ - rows_end_offset_ - 
1));
     }
   }
   return Status::OK();
@@ -614,7 +640,7 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* 
state) {
   if (UNLIKELY(stream_idx == 0 && curr_child_batch_->num_rows() > 0)) {
     TupleRow* row = curr_child_batch_->GetRow(0);
     RETURN_IF_ERROR(AddRow(0, row));
-    TryAddResultTupleForCurrRow(0, row);
+    RETURN_IF_ERROR(TryAddResultTupleForCurrRow(0, row));
     prev_input_row_ = row;
     ++batch_idx;
     ++stream_idx;
@@ -647,19 +673,19 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* 
state) {
       // partition_by_eq_expr_ctx_ checks equality over the predicate exprs
       next_partition = !PrevRowCompare(partition_by_eq_expr_ctx_);
     }
-    TryAddResultTupleForPrevRow(next_partition, stream_idx, row);
+    RETURN_IF_ERROR(TryAddResultTupleForPrevRow(next_partition, stream_idx, 
row));
     if (next_partition) RETURN_IF_ERROR(InitNextPartition(state, stream_idx));
 
     // The evaluators_ are updated with the current row.
     RETURN_IF_ERROR(AddRow(stream_idx, row));
 
-    TryAddResultTupleForCurrRow(stream_idx, row);
+    RETURN_IF_ERROR(TryAddResultTupleForCurrRow(stream_idx, row));
     prev_input_row_ = row;
   }
 
   if (UNLIKELY(input_eos_ && stream_idx > curr_partition_idx_)) {
     // We need to add the results for the last row(s).
-    TryAddRemainingResults(stream_idx, curr_partition_idx_);
+    RETURN_IF_ERROR(TryAddRemainingResults(stream_idx, curr_partition_idx_));
   }
 
   // Transfer resources to prev_tuple_pool_ when enough resources have 
accumulated
@@ -846,7 +872,10 @@ void AnalyticEvalNode::Close(RuntimeState* state) {
     }
     evaluators_[i]->Close(state);
   }
-  for (int i = 0; i < fn_ctxs_.size(); ++i) fn_ctxs_[i]->impl()->Close();
+  for (int i = 0; i < fn_ctxs_.size(); ++i) {
+    fn_ctxs_[i]->impl()->FreeLocalAllocations();
+    fn_ctxs_[i]->impl()->Close();
+  }
 
   if (partition_by_eq_expr_ctx_ != NULL) 
partition_by_eq_expr_ctx_->Close(state);
   if (order_by_eq_expr_ctx_ != NULL) order_by_eq_expr_ctx_->Close(state);
@@ -878,6 +907,7 @@ Status AnalyticEvalNode::QueryMaintenance(RuntimeState* 
state) {
   for (int i = 0; i < evaluators_.size(); ++i) {
     ExprContext::FreeLocalAllocations(evaluators_[i]->input_expr_ctxs());
   }
+  ExprContext::FreeLocalAllocations(fn_ctxs_);
   return ExecNode::QueryMaintenance(state);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 68b5bbc..579ad5b 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -132,21 +132,24 @@ class AnalyticEvalNode : public ExecNode {
   Status AddRow(int64_t stream_idx, TupleRow* row);
 
   /// Determines if there is a window ending at the previous row, and if so, 
calls
-  /// AddResultTuple() with the index of the previous row in input_stream_. 
next_partition
-  /// indicates if the current row is the start of a new partition. stream_idx 
is the
-  /// index of the current input row from input_stream_.
-  void TryAddResultTupleForPrevRow(bool next_partition, int64_t stream_idx,
+  /// AddResultTuple() with the index of the previous row in 'input_stream_'.
+  /// 'next_partition' indicates if the current row is the start of a new 
partition.
+  /// 'stream_idx' is the index of the current input row from 'input_stream_'.
+  /// Returns an error when memory limit is exceeded.
+  Status TryAddResultTupleForPrevRow(bool next_partition, int64_t stream_idx,
       TupleRow* row);
 
   /// Determines if there is a window ending at the current row, and if so, 
calls
-  /// AddResultTuple() with the index of the current row in input_stream_. 
stream_idx is
-  /// the index of the current input row from input_stream_.
-  void TryAddResultTupleForCurrRow(int64_t stream_idx, TupleRow* row);
+  /// AddResultTuple() with the index of the current row in 'input_stream_'.
+  /// 'stream_idx' is the index of the current input row from 'input_stream_'.
+  /// Returns an error when memory limit is exceeded.
+  Status TryAddResultTupleForCurrRow(int64_t stream_idx, TupleRow* row);
 
   /// Adds additional result tuples at the end of a partition, e.g. if the end 
bound is
   /// FOLLOWING. partition_idx is the index into input_stream_ of the new 
partition,
-  /// prev_partition_idx is the index of the previous partition.
-  void TryAddRemainingResults(int64_t partition_idx, int64_t 
prev_partition_idx);
+  /// 'prev_partition_idx' is the index of the previous partition.
+  /// Returns an error when memory limit is exceeded.
+  Status TryAddRemainingResults(int64_t partition_idx, int64_t 
prev_partition_idx);
 
   /// Removes rows from curr_tuple_ (by calling AggFnEvaluator::Remove()) that 
are no
   /// longer in the window (i.e. they are before the window start boundary). 
stream_idx
@@ -159,9 +162,10 @@ class AnalyticEvalNode : public ExecNode {
   Status InitNextPartition(RuntimeState* state, int64_t stream_idx);
 
   /// Produces a result tuple with analytic function results by calling 
GetValue() or
-  /// Finalize() for curr_tuple_ on the evaluators_. The result tuple is 
stored in
-  /// result_tuples_ with the index into input_stream_ specified by stream_idx.
-  void AddResultTuple(int64_t stream_idx);
+  /// Finalize() for 'curr_tuple_' on the 'evaluators_'. The result tuple is 
stored in
+  /// 'result_tuples_' with the index into 'input_stream_' specified by 
'stream_idx'.
+  /// Returns an error when memory limit is exceeded.
+  Status AddResultTuple(int64_t stream_idx);
 
   /// Gets the number of rows that are ready to be returned by subsequent 
calls to
   /// GetNextOutputBatch().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exprs/agg-fn-evaluator.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/agg-fn-evaluator.h b/be/src/exprs/agg-fn-evaluator.h
index cde969c..b3ecda0 100644
--- a/be/src/exprs/agg-fn-evaluator.h
+++ b/be/src/exprs/agg-fn-evaluator.h
@@ -147,9 +147,11 @@ class AggFnEvaluator {
   void Finalize(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
 
   /// Puts the finalized value from Tuple* src in Tuple* dst just as 
Finalize() does.
-  /// However, unlike Finalize(), GetValue() does not clean up state in src. 
GetValue()
-  /// can be called repeatedly with the same src. Only used internally for 
analytic fn
-  /// builtins.
+  /// However, unlike Finalize(), GetValue() does not clean up state in src.
+  /// GetValue() can be called repeatedly with the same src. Only used 
internally for
+  /// analytic fn builtins. Note that StringVal result is from local 
allocation (which
+  /// will be freed in the next QueryMaintenance()) so it needs to be copied 
out if it
+  /// needs to survive beyond QueryMaintenance() (e.g. if 'dst' lives in a row 
batch).
   void GetValue(FunctionContext* agg_fn_ctx, Tuple* src, Tuple* dst);
 
   /// Helper functions for calling the above functions on many evaluators.
@@ -229,10 +231,13 @@ class AggFnEvaluator {
   /// fn must be a function that implement's the UDA Update() signature.
   void Update(FunctionContext* agg_fn_ctx, const TupleRow* row, Tuple* dst, 
void* fn);
 
-  /// Sets up the arguments to call fn. This converts from the agg-expr 
signature,
+  /// Sets up the arguments to call 'fn'. This converts from the agg-expr 
signature,
   /// taking TupleRow to the UDA signature taking AnvVals. Writes the 
serialize/finalize
-  /// result to the given destination slot/tuple. The fn can be NULL to 
indicate the src
-  /// value should simply be written into the destination.
+  /// result to the given destination slot/tuple. 'fn' can be NULL to indicate 
the src
+  /// value should simply be written into the destination. Note that StringVal 
result is
+  /// from local allocation (which will be freed in the next 
QueryMaintenance()) so it
+  /// needs to be copied out if it needs to survive beyond QueryMaintenance() 
(e.g. if
+  /// 'dst' lives in a row batch).
   void SerializeOrFinalize(FunctionContext* agg_fn_ctx, Tuple* src,
       const SlotDescriptor* dst_slot_desc, Tuple* dst, void* fn);
 
@@ -265,7 +270,7 @@ inline void AggFnEvaluator::GetValue(
 }
 
 inline void AggFnEvaluator::Init(const std::vector<AggFnEvaluator*>& 
evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Init(fn_ctxs[i], dst);
@@ -279,28 +284,28 @@ inline void AggFnEvaluator::Add(const 
std::vector<AggFnEvaluator*>& evaluators,
   }
 }
 inline void AggFnEvaluator::Remove(const std::vector<AggFnEvaluator*>& 
evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, 
Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, const TupleRow* src, Tuple* 
dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Remove(fn_ctxs[i], src, dst);
   }
 }
 inline void AggFnEvaluator::Serialize(const std::vector<AggFnEvaluator*>& 
evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Serialize(fn_ctxs[i], dst);
   }
 }
 inline void AggFnEvaluator::GetValue(const std::vector<AggFnEvaluator*>& 
evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->GetValue(fn_ctxs[i], src, dst);
   }
 }
 inline void AggFnEvaluator::Finalize(const std::vector<AggFnEvaluator*>& 
evaluators,
-      const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
+    const std::vector<FunctionContext*>& fn_ctxs, Tuple* src, Tuple* dst) {
   DCHECK_EQ(evaluators.size(), fn_ctxs.size());
   for (int i = 0; i < evaluators.size(); ++i) {
     evaluators[i]->Finalize(fn_ctxs[i], src, dst);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/exprs/aggregate-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/aggregate-functions-ir.cc 
b/be/src/exprs/aggregate-functions-ir.cc
index 44b72a3..dd81005 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -1583,10 +1583,19 @@ void AggregateFunctions::OffsetFnInit(FunctionContext* 
ctx, T* dst) {
   *dst = *static_cast<T*>(ctx->GetConstantArg(2));
 }
 
+template <>
+void AggregateFunctions::OffsetFnInit(FunctionContext* ctx, StringVal* dst) {
+  DCHECK_EQ(ctx->GetNumArgs(), 3);
+  DCHECK(ctx->IsArgConstant(1));
+  DCHECK(ctx->IsArgConstant(2));
+  DCHECK_EQ(ctx->GetArgType(0)->type, ctx->GetArgType(2)->type);
+  CopyStringVal(ctx, *static_cast<StringVal*>(ctx->GetConstantArg(2)), dst);
+}
+
 template <typename T>
 void AggregateFunctions::OffsetFnUpdate(FunctionContext* ctx, const T& src,
     const BigIntVal&, const T& default_value, T* dst) {
-  *dst = src;
+  UpdateVal(ctx, src, dst);
 }
 
 // Stamp out the templates for the types we need.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 13634dc..4ad8d8b 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -194,7 +194,8 @@ void FunctionContextImpl::Close() {
   stringstream error_ss;
   if (!debug_) {
     if (pool_->net_allocations() > 0) {
-      error_ss << "Memory leaked via FunctionContext::Allocate()";
+      error_ss << "Memory leaked via FunctionContext::Allocate() "
+               << "or FunctionContext::AllocateLocal()";
     } else if (pool_->net_allocations() < 0) {
       error_ss << "FunctionContext::Free() called on buffer that was already 
freed or "
                   "was not allocated.";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java 
b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
index fb4accc..4fcecb2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
+++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java
@@ -1035,7 +1035,9 @@ public class BuiltinsDb extends Db {
           db, "lead", Lists.newArrayList(t, Type.BIGINT, t), t, t,
           prefix + OFFSET_FN_INIT_SYMBOL.get(t),
           prefix + OFFSET_FN_UPDATE_SYMBOL.get(t),
-          null, null, null));
+          null,
+          t == Type.STRING ? stringValGetValue : null,
+          t == Type.STRING ? stringValSerializeOrFinalize : null));
 
       // lead() and lag() the default offset and the default value should be
       // rewritten to call the overrides that take all parameters.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/51268c05/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test 
b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
index 93431bf..62c9a67 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/analytic-fns.test
@@ -1893,3 +1893,17 @@ DECIMAL, DECIMAL
 12345,1234
 132842,1234
 ====
+---- QUERY
+# Regression test for IMPALA-4120: Invoke UDFs which do local allocations and 
verifies
+# that the results are copied out.
+select count(*) from (
+select
+  from_unixtime(lead(bigint_col, 1) over (order by id), 'yyyyMMddHH:mm:ss') as 
a,
+  lead(from_unixtime(bigint_col, 'yyyyMMddHH:mm:ss'), 1) over (order by id) AS 
b
+from functional.alltypes) x
+where x.a = x.b
+---- TYPES
+BIGINT
+---- RESULTS
+7299
+====

Reply via email to