Repository: incubator-impala
Updated Branches:
refs/heads/master 62894e323 -> 0ff1e6e8d
IMPALA-4787: Optimize APPX_MEDIAN() memory usage
Before this change, ReservoirSample functions (such as APPX_MEDIAN())
allocated memory for 20,000 elements up front per grouping key. This
caused inefficient memory usage for aggregations with many grouping
keys.
This patch fixes this by initially allocating memory for 16 elements.
Once the buffer becomes full, we reallocate a new buffer with double
capacity and copy the original buffer into the new one. We continue
doubling the buffer size until the buffer has room for 20,000 elements
as before.
Testing:
Added some EE APPX_MEDIAN() tests on larger datasets that exercise the
resize code path.
Perf Benchrmark (about 35,000 elements per bucket):
SELECT MAX(a) from (
SELECT c1, appx_median(c2) as a FROM benchmark GROUP BY c1) t
BEFORE: 11s067ms
Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est.
Peak Mem Detail
-------------------------------------------------------------------------------------------------------------------------
06:AGGREGATE 1 124.726us 124.726us 1 1 28.00 KB
-1.00 B FINALIZE
05:EXCHANGE 1 29.544us 29.544us 3 1 0
-1.00 B UNPARTITIONED
02:AGGREGATE 3 86.406us 120.372us 3 1 44.00 KB
10.00 MB
04:AGGREGATE 3 1s840ms 2s824ms 2.00K -1 1.02 GB
128.00 MB FINALIZE
03:EXCHANGE 3 1s163ms 1s989ms 6.00K -1 0
0 HASH(c1)
01:AGGREGATE 3 3s356ms 3s416ms 6.00K -1 1.95 GB
128.00 MB STREAMING
00:SCAN HDFS 3 64.962ms 65.490ms 65.54M -1 25.97 MB
64.00 MB tpcds_10_parquet.benchmark
AFTER: 9s465ms
Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem Est.
Peak Mem Detail
------------------------------------------------------------------------------------------------------------------------
06:AGGREGATE 1 73.961us 73.961us 1 1 28.00 KB
-1.00 B FINALIZE
05:EXCHANGE 1 18.101us 18.101us 3 1 0
-1.00 B UNPARTITIONED
02:AGGREGATE 3 75.795us 83.969us 3 1 44.00 KB
10.00 MB
04:AGGREGATE 3 1s608ms 2s683ms 2.00K -1 1.02 GB
128.00 MB FINALIZE
03:EXCHANGE 3 826.683ms 1s322ms 6.00K -1 0
0 HASH(c1)
01:AGGREGATE 3 2s457ms 2s672ms 6.00K -1 3.14 GB
128.00 MB STREAMING
00:SCAN HDFS 3 81.514ms 89.056ms 65.54M -1 25.94 MB
64.00 MB tpcds_10_parquet.benchmark
Memory Benchmark (about 12 elements per bucket):
SELECT MAX(a) FROM (
SELECT ss_customer_sk, APPX_MEDIAN(ss_sold_date_sk) as a
FROM tpcds_parquet.store_sales
GROUP BY ss_customer_sk) t
BEFORE: 7s477ms
Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem
Est. Peak Mem Detail
---------------------------------------------------------------------------------------------------------------------
06:AGGREGATE 1 114.686us 114.686us 1 1 28.00 KB
-1.00 B FINALIZE
05:EXCHANGE 1 18.214us 18.214us 3 1 0
-1.00 B UNPARTITIONED
02:AGGREGATE 3 147.055us 165.464us 3 1 28.00 KB
10.00 MB
04:AGGREGATE 3 2s043ms 2s147ms 14.82K -1 4.94 GB
128.00 MB FINALIZE
03:EXCHANGE 3 840.528ms 943.254ms 15.61K -1 0
0 HASH(ss_customer_sk)
01:AGGREGATE 3 1s769ms 1s869ms 15.61K -1 5.32 GB
128.00 MB STREAMING
00:SCAN HDFS 3 17.941ms 37.109ms 183.59K -1 1.94 MB
16.00 MB tpcds_parquet.store_sales
AFTER: 434ms
Operator #Hosts Avg Time Max Time #Rows Est. #Rows Peak Mem
Est. Peak Mem Detail
---------------------------------------------------------------------------------------------------------------------
06:AGGREGATE 1 125.915us 125.915us 1 1 28.00 KB
-1.00 B FINALIZE
05:EXCHANGE 1 72.179us 72.179us 3 1 0
-1.00 B UNPARTITIONED
02:AGGREGATE 3 79.054us 83.385us 3 1 28.00 KB
10.00 MB
04:AGGREGATE 3 6.559ms 7.669ms 14.82K -1 17.32 MB
128.00 MB FINALIZE
03:EXCHANGE 3 67.370us 85.068us 15.60K -1 0
0 HASH(ss_customer_sk)
01:AGGREGATE 3 19.245ms 24.472ms 15.60K -1 9.48 MB
128.00 MB STREAMING
00:SCAN HDFS 3 53.173ms 55.844ms 183.59K -1 1.18 MB
16.00 MB tpcds_parquet.store_sales
Change-Id: I99adaad574d4fb0a3cf38c6cbad8b2a23df12968
Reviewed-on: http://gerrit.cloudera.org:8080/6025
Reviewed-by: Taras Bobrovytsky <[email protected]>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/529a5f99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/529a5f99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/529a5f99
Branch: refs/heads/master
Commit: 529a5f99b959079faead34a977fba1125d01840e
Parents: 62894e3
Author: Taras Bobrovytsky <[email protected]>
Authored: Mon Feb 13 18:14:56 2017 -0800
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Mar 16 05:59:40 2017 +0000
----------------------------------------------------------------------
be/src/exprs/aggregate-functions-ir.cc | 441 +++++++++++++------
.../queries/QueryTest/aggregation.test | 17 +-
.../queries/QueryTest/alloc-fail-init.test | 4 +-
3 files changed, 316 insertions(+), 146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/529a5f99/be/src/exprs/aggregate-functions-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/aggregate-functions-ir.cc
b/be/src/exprs/aggregate-functions-ir.cc
index 10a775d..fc4715a 100644
--- a/be/src/exprs/aggregate-functions-ir.cc
+++ b/be/src/exprs/aggregate-functions-ir.cc
@@ -40,10 +40,11 @@
using boost::uniform_int;
using boost::ranlux64_3;
-using std::push_heap;
-using std::pop_heap;
-using std::map;
using std::make_pair;
+using std::map;
+using std::nth_element;
+using std::pop_heap;
+using std::push_heap;
namespace {
// Threshold for each precision where it's better to use linear counting
instead
@@ -122,9 +123,6 @@ int64_t HllEstimateBias(int64_t estimate) {
}
-// TODO: this file should be cross compiled and then all of the builtin
-// aggregate functions will have a codegen enabled path. Then we can remove
-// the custom code in aggregation node.
namespace impala {
// This function initializes StringVal 'dst' with a newly allocated buffer of
@@ -902,8 +900,6 @@ BigIntVal AggregateFunctions::PcsaFinalize(FunctionContext*
c, const StringVal&
// TODO: Expose as constant argument parameters to the UDA.
const static int NUM_BUCKETS = 100;
const static int NUM_SAMPLES_PER_BUCKET = 200;
-const static int NUM_SAMPLES = NUM_BUCKETS * NUM_SAMPLES_PER_BUCKET;
-const static int MAX_STRING_SAMPLE_LEN = 10;
template <typename T>
struct ReservoirSample {
@@ -919,6 +915,9 @@ struct ReservoirSample {
T GetValue(FunctionContext* ctx) { return val; }
};
+// Maximum length of a string sample.
+const static int MAX_STRING_SAMPLE_LEN = 10;
+
// Template specialization for StringVal because we do not store the StringVal
itself.
// Instead, we keep fixed size arrays and truncate longer strings if necessary.
template <>
@@ -941,22 +940,267 @@ struct ReservoirSample<StringVal> {
};
template <typename T>
-struct ReservoirSampleState {
- ReservoirSample<T> samples[NUM_SAMPLES];
+bool SampleValLess(const ReservoirSample<T>& i, const ReservoirSample<T>& j) {
+ return i.val.val < j.val.val;
+}
+
+template <>
+bool SampleValLess(const ReservoirSample<StringVal>& i,
+ const ReservoirSample<StringVal>& j) {
+ int n = min(i.len, j.len);
+ int result = memcmp(&i.val[0], &j.val[0], n);
+ if (result == 0) return i.len < j.len;
+ return result < 0;
+}
+
+template <>
+bool SampleValLess(const ReservoirSample<DecimalVal>& i,
+ const ReservoirSample<DecimalVal>& j) {
+ // Also handles val4 and val8 - the DecimalVal memory layout ensures the
least
+ // significant bits overlap in memory.
+ return i.val.val16 < j.val.val16;
+}
+
+template <>
+bool SampleValLess(const ReservoirSample<TimestampVal>& i,
+ const ReservoirSample<TimestampVal>& j) {
+ if (i.val.date == j.val.date) return i.val.time_of_day < j.val.time_of_day;
+ else return i.val.date < j.val.date;
+}
+
+template <typename T>
+bool SampleKeyGreater(const ReservoirSample<T>& i, const ReservoirSample<T>&
j) {
+ return i.key > j.key;
+}
+
+// Keeps track of the current state of the reservoir sampling algorithm. The
samples are
+// stored in a dynamically sized array. Initially, the the samples array is
stored in a
+// separate memory allocation. This class is responsible for managing the
memory of the
+// array and reallocating when the array is full. When this object is
serialized into an
+// output buffer, the samples array is inlined into the output buffer as well.
+template <typename T>
+class ReservoirSampleState {
+ public:
+ ReservoirSampleState(FunctionContext* ctx)
+ : num_samples_(0),
+ capacity_(INIT_CAPACITY),
+ source_size_(0),
+ sample_array_inline_(false),
+ samples_(NULL) {
+ // Allocate some initial memory for the samples array.
+ size_t buffer_len = sizeof(ReservoirSample<T>) * capacity_;
+ uint8_t* ptr = ctx->Allocate(buffer_len);
+ if (ptr == NULL) {
+ DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+ return;
+ }
+ samples_ = reinterpret_cast<ReservoirSample<T>*>(ptr);
+ }
+
+ // Returns a pointer to a ReservoirSample at idx.
+ ReservoirSample<T>* GetSample(int64_t idx) {
+ DCHECK(samples_ != NULL);
+ DCHECK_LT(idx, num_samples_);
+ DCHECK_LE(num_samples_, capacity_);
+ DCHECK_GE(idx, 0);
+ return &samples_[idx];
+ }
+
+ // Adds a sample and increments the source size. Doubles the capacity of the
sample
+ // array if necessary. If max capacity is reached, randomly evicts a sample
(as
+ // required by the algorithm). Returns false if the attempt to double the
capacity
+ // fails, true otherwise.
+ bool AddSample(FunctionContext* ctx, const ReservoirSample<T>& s) {
+ DCHECK(samples_ != NULL);
+ DCHECK_LE(num_samples_, MAX_CAPACITY);
+ if (num_samples_ < MAX_CAPACITY) {
+ if (num_samples_ == capacity_) {
+ bool result = IncreaseCapacity(ctx, capacity_ * 2);
+ if (!result) return false;
+ }
+ DCHECK_LT(num_samples_, capacity_);
+ samples_[num_samples_++] = s;
+ } else {
+ DCHECK_EQ(num_samples_, MAX_CAPACITY);
+ DCHECK(!sample_array_inline_);
+ int64_t idx = GetNext64(source_size_);
+ if (idx < MAX_CAPACITY) samples_[idx] = s;
+ }
+ ++source_size_;
+ return true;
+ }
+
+ // Same as above.
+ bool AddSample(FunctionContext* ctx, const T& s) {
+ return AddSample(ctx, ReservoirSample<T>(s));
+ }
+
+ // Returns a buffer with a serialized ReservoirSampleState and the array of
samples it
+ // contains. The samples array must not be inlined; i.e. it must be in a
separate memory
+ // allocation. Returns a buffer containing this object and inlined samples
array. The
+ // memory containing this object and the samples array is freed. The
serialized object
+ // in the output buffer requires a call to Deserialize() before use.
+ StringVal Serialize(FunctionContext* ctx) {
+ DCHECK(samples_ != NULL);
+ DCHECK(!sample_array_inline_);
+ // Assign keys to the samples that haven't been set (i.e. if serializing
after
+ // Update()). In weighted reservoir sampling the keys are typically
assigned as the
+ // sources are being sampled, but this requires maintaining the samples in
sorted
+ // order (by key) and it accomplishes the same thing at this point because
all data
+ // points coming into Update() get the same weight. When the samples are
later merged,
+ // they do have different weights (set here) that are proportional to the
source_size,
+ // i.e. samples selected from a larger stream are more likely to end up in
the final
+ // sample set. In order to avoid the extra overhead in Update(), we
approximate the
+ // keys by picking random numbers in the range
+ // [(SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE), 1]. This weights the keys by
+ // SOURCE_SIZE and implies that the samples picked had the highest keys,
because
+ // values not sampled would have keys between 0 and
+ // (SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE).
+ for (int i = 0; i < num_samples_; ++i) {
+ if (samples_[i].key >= 0) continue;
+ int r = rand() % num_samples_;
+ samples_[i].key = ((double) source_size_ - r) / source_size_;
+ }
+ capacity_ = num_samples_;
+ sample_array_inline_ = true;
+
+ size_t buffer_len = sizeof(ReservoirSampleState<T>) +
+ sizeof(ReservoirSample<T>) * num_samples_;
+ StringVal dst = StringVal::CopyFrom(
+ ctx, reinterpret_cast<uint8_t*>(this), buffer_len);
+ memcpy(dst.ptr + sizeof(ReservoirSampleState<T>),
+ reinterpret_cast<uint8_t*>(samples_), sizeof(ReservoirSample<T>) *
num_samples_);
+ ctx->Free(reinterpret_cast<uint8_t*>(samples_));
+ ctx->Free(reinterpret_cast<uint8_t*>(this));
+ return dst;
+ }
+
+ // Updates the pointer to the samples array. Must be called before using
this object in
+ // Merge().
+ void Deserialize() {
+ DCHECK(sample_array_inline_);
+ samples_ = reinterpret_cast<ReservoirSample<T>*>(this + 1);
+ }
+
+ // Merges the samples in "other_state" into the current state by following
the
+ // reservoir sampling algorithm. If necessary, increases the capacity to fit
the
+ // samples from "other_state". In the case of failure to increase the size
of the
+ // array, returns.
+ void Merge(FunctionContext* ctx, ReservoirSampleState<T>* other_state) {
+ DCHECK(samples_ != NULL);
+ DCHECK_GT(capacity_, 0);
+ other_state->Deserialize();
+ int src_idx = 0;
+ // We can increase the capacity significantly here and skip several
doublings because
+ // we know the number of elements in the other state up front.
+ if (capacity_ < MAX_CAPACITY) {
+ int necessary_capacity = num_samples_ + other_state->num_samples();
+ if (capacity_ < necessary_capacity) {
+ bool result = IncreaseCapacity(ctx, necessary_capacity);
+ if (!result) return;
+ }
+ }
+
+ // First, fill up the dst samples if they don't already exist. The samples
are now
+ // ordered as a min-heap on the key.
+ while (num_samples_ < MAX_CAPACITY && src_idx <
other_state->num_samples()) {
+ DCHECK_GE(other_state->GetSample(src_idx)->key, 0);
+ bool result = AddSample(ctx, *other_state->GetSample(src_idx++));
+ if (!result) return;
+ push_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>);
+ }
+
+ // Then for every sample from source, take the sample if the key is
greater than
+ // the minimum key in the min-heap.
+ while (src_idx < other_state->num_samples()) {
+ DCHECK_GE(other_state->GetSample(src_idx)->key, 0);
+ if (other_state->GetSample(src_idx)->key > samples_[0].key) {
+ pop_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>);
+ samples_[MAX_CAPACITY - 1] = *other_state->GetSample(src_idx);
+ push_heap(&samples_[0], &samples_[num_samples_], SampleKeyGreater<T>);
+ }
+ ++src_idx;
+ }
+
+ source_size_ += other_state->source_size();
+ }
+
+ // Returns the median element.
+ T GetMedian(FunctionContext* ctx) {
+ if (num_samples_ == 0) return T::null();
+ ReservoirSample<T>* mid_point = GetSample(num_samples_ / 2);
+ nth_element(&samples_[0], mid_point, &samples_[num_samples_],
SampleValLess<T>);
+ return mid_point->GetValue(ctx);
+ }
+
+ // Sorts the samples.
+ void SortSamples() {
+ sort(&samples_[0], &samples_[num_samples_], SampleValLess<T>);
+ }
+
+ // Deletes this object by freeing the memory that contains the array of
samples (if not
+ // inlined) and itself.
+ void Delete(FunctionContext* ctx) {
+ if (!sample_array_inline_) ctx->Free(reinterpret_cast<uint8_t*>(samples_));
+ ctx->Free(reinterpret_cast<uint8_t*>(this));
+ }
+
+ int num_samples() { return num_samples_; }
+ int64_t source_size() { return source_size_; }
+
+ private:
+ // The initial capacity of the samples array.
+ const static int INIT_CAPACITY = 16;
+
+ // Maximum capacity of the samples array.
+ const static int MAX_CAPACITY = NUM_BUCKETS * NUM_SAMPLES_PER_BUCKET;
// Number of collected samples.
- int num_samples;
+ int num_samples_;
+
+ // Size of the "samples_" array.
+ int capacity_;
// Number of values over which the samples were collected.
- int64_t source_size;
+ int64_t source_size_;
// Random number generator for generating 64-bit integers
// TODO: Replace with mt19937_64 when upgrading boost
- ranlux64_3 rng;
+ ranlux64_3 rng_;
+
+ // True if the array of samples is in the same memory allocation as this
object. If
+ // false, this object is responsible for freeing the memory.
+ bool sample_array_inline_;
+
+ // Points to the array of ReservoirSamples. The array may be located inline
(right after
+ // this object), or in a separate memory allocation.
+ ReservoirSample<T>* samples_;
+
+ // Increases the capacity of the "samples_" array to "new_capacity" rounded
up to a
+ // power of two by reallocating. Should only be called if the samples array
is not
+ // inline. Returns false if the operation fails.
+ bool IncreaseCapacity(FunctionContext* ctx, int new_capacity) {
+ DCHECK(samples_ != NULL);
+ DCHECK(!sample_array_inline_);
+ DCHECK_LT(capacity_, MAX_CAPACITY);
+ DCHECK_GT(new_capacity, capacity_);
+ new_capacity = BitUtil::RoundUpToPowerOfTwo(new_capacity);
+ if (new_capacity > MAX_CAPACITY) new_capacity = MAX_CAPACITY;
+ size_t buffer_len = sizeof(ReservoirSample<T>) * new_capacity;
+ uint8_t* ptr = ctx->Reallocate(reinterpret_cast<uint8_t*>(samples_),
buffer_len);
+ if (ptr == NULL) {
+ DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
+ return false;
+ }
+ samples_ = reinterpret_cast<ReservoirSample<T>*>(ptr);
+ capacity_ = new_capacity;
+ return true;
+ }
+ // Returns a random integer in the range [0, max].
int64_t GetNext64(int64_t max) {
uniform_int<int64_t> dist(0, max);
- return dist(rng);
+ return dist(rng_);
}
};
@@ -967,7 +1211,9 @@ void
AggregateFunctions::ReservoirSampleInit(FunctionContext* ctx, StringVal* ds
DCHECK(!ctx->impl()->state()->GetQueryStatus().ok());
return;
}
- *reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr) =
ReservoirSampleState<T>();
+ ReservoirSampleState<T>* dst_state =
+ reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
+ *dst_state = ReservoirSampleState<T>(ctx);
}
template <typename T>
@@ -975,113 +1221,32 @@ void
AggregateFunctions::ReservoirSampleUpdate(FunctionContext* ctx, const T& sr
StringVal* dst) {
if (src.is_null) return;
DCHECK(!dst->is_null);
- DCHECK_EQ(dst->len, sizeof(ReservoirSampleState<T>));
- ReservoirSampleState<T>* state =
reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
-
- if (state->num_samples < NUM_SAMPLES) {
- state->samples[state->num_samples++] = ReservoirSample<T>(src);
- } else {
- int64_t r = state->GetNext64(state->source_size);
- if (r < NUM_SAMPLES) state->samples[r] = ReservoirSample<T>(src);
- }
- ++state->source_size;
+ ReservoirSampleState<T>* dst_state =
+ reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
+ dst_state->AddSample(ctx, src);
}
template <typename T>
StringVal AggregateFunctions::ReservoirSampleSerialize(FunctionContext* ctx,
const StringVal& src) {
if (UNLIKELY(src.is_null)) return src;
- StringVal result = StringVal::CopyFrom(ctx, src.ptr, src.len);
- ctx->Free(src.ptr);
- if (UNLIKELY(result.is_null)) return result;
-
- ReservoirSampleState<T>* state =
reinterpret_cast<ReservoirSampleState<T>*>(result.ptr);
- // Assign keys to the samples that haven't been set (i.e. if serializing
after
- // Update()). In weighted reservoir sampling the keys are typically assigned
as the
- // sources are being sampled, but this requires maintaining the samples in
sorted order
- // (by key) and it accomplishes the same thing at this point because all
data points
- // coming into Update() get the same weight. When the samples are later
merged, they do
- // have different weights (set here) that are proportional to the
source_size, i.e.
- // samples selected from a larger stream are more likely to end up in the
final sample
- // set. In order to avoid the extra overhead in Update(), we approximate the
keys by
- // picking random numbers in the range [(SOURCE_SIZE -
SAMPLE_SIZE)/(SOURCE_SIZE), 1].
- // This weights the keys by SOURCE_SIZE and implies that the samples picked
had the
- // highest keys, because values not sampled would have keys between 0 and
- // (SOURCE_SIZE - SAMPLE_SIZE)/(SOURCE_SIZE).
- for (int i = 0; i < state->num_samples; ++i) {
- if (state->samples[i].key >= 0) continue;
- int r = rand() % state->num_samples;
- state->samples[i].key = ((double) state->source_size - r) /
state->source_size;
- }
+ ReservoirSampleState<T>* src_state =
+ reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
+ StringVal result = src_state->Serialize(ctx);
return result;
}
template <typename T>
-bool SampleValLess(const ReservoirSample<T>& i, const ReservoirSample<T>& j) {
- return i.val.val < j.val.val;
-}
-
-template <>
-bool SampleValLess(const ReservoirSample<StringVal>& i,
- const ReservoirSample<StringVal>& j) {
- int n = min(i.len, j.len);
- int result = memcmp(&i.val[0], &j.val[0], n);
- if (result == 0) return i.len < j.len;
- return result < 0;
-}
-
-template <>
-bool SampleValLess(const ReservoirSample<DecimalVal>& i,
- const ReservoirSample<DecimalVal>& j) {
- // Also handles val4 and val8 - the DecimalVal memory layout ensures the
least
- // significant bits overlap in memory.
- return i.val.val16 < j.val.val16;
-}
-
-template <>
-bool SampleValLess(const ReservoirSample<TimestampVal>& i,
- const ReservoirSample<TimestampVal>& j) {
- if (i.val.date == j.val.date) return i.val.time_of_day < j.val.time_of_day;
- else return i.val.date < j.val.date;
-}
-
-template <typename T>
-bool SampleKeyGreater(const ReservoirSample<T>& i, const ReservoirSample<T>&
j) {
- return i.key > j.key;
-}
-
-template <typename T>
void AggregateFunctions::ReservoirSampleMerge(FunctionContext* ctx,
- const StringVal& src_val, StringVal* dst_val) {
- if (src_val.is_null) return;
- DCHECK(!dst_val->is_null);
- DCHECK(!src_val.is_null);
- DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
- DCHECK_EQ(dst_val->len, sizeof(ReservoirSampleState<T>));
- ReservoirSampleState<T>* src =
reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
- ReservoirSampleState<T>* dst =
reinterpret_cast<ReservoirSampleState<T>*>(dst_val->ptr);
-
- int src_idx = 0;
- int src_max = src->num_samples;
- // First, fill up the dst samples if they don't already exist. The samples
are now
- // ordered as a min-heap on the key.
- while (dst->num_samples < NUM_SAMPLES && src_idx < src_max) {
- DCHECK_GE(src->samples[src_idx].key, 0);
- dst->samples[dst->num_samples++] = src->samples[src_idx++];
- push_heap(dst->samples, dst->samples + dst->num_samples,
SampleKeyGreater<T>);
- }
- // Then for every sample from source, take the sample if the key is greater
than
- // the minimum key in the min-heap.
- while (src_idx < src_max) {
- DCHECK_GE(src->samples[src_idx].key, 0);
- if (src->samples[src_idx].key > dst->samples[0].key) {
- pop_heap(dst->samples, dst->samples + NUM_SAMPLES, SampleKeyGreater<T>);
- dst->samples[NUM_SAMPLES - 1] = src->samples[src_idx];
- push_heap(dst->samples, dst->samples + NUM_SAMPLES, SampleKeyGreater<T>);
- }
- ++src_idx;
- }
- dst->source_size += src->source_size;
+ const StringVal& src, StringVal* dst) {
+ if (src.is_null) return;
+ DCHECK(!dst->is_null);
+ DCHECK(!src.is_null);
+ ReservoirSampleState<T>* src_state =
+ reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
+ ReservoirSampleState<T>* dst_state =
+ reinterpret_cast<ReservoirSampleState<T>*>(dst->ptr);
+ dst_state->Merge(ctx, src_state);
}
template <typename T>
@@ -1112,64 +1277,56 @@ void PrintSample(const ReservoirSample<TimestampVal>&
v, ostream* os) {
template <typename T>
StringVal AggregateFunctions::ReservoirSampleFinalize(FunctionContext* ctx,
- const StringVal& src_val) {
- if (UNLIKELY(src_val.is_null)) return src_val;
- DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
- ReservoirSampleState<T>* src =
reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
+ const StringVal& src) {
+ if (UNLIKELY(src.is_null)) return src;
+ ReservoirSampleState<T>* src_state =
+ reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
stringstream out;
- for (int i = 0; i < src->num_samples; ++i) {
- PrintSample<T>(src->samples[i], &out);
- if (i < (src->num_samples - 1)) out << ", ";
+ for (int i = 0; i < src_state->num_samples(); ++i) {
+ PrintSample<T>(*src_state->GetSample(i), &out);
+ if (i < (src_state->num_samples() - 1)) out << ", ";
}
const string& out_str = out.str();
StringVal result_str(ctx, out_str.size());
if (LIKELY(!result_str.is_null)) {
memcpy(result_str.ptr, out_str.c_str(), result_str.len);
}
- ctx->Free(src_val.ptr);
+ src_state->Delete(ctx);
return result_str;
}
template <typename T>
StringVal AggregateFunctions::HistogramFinalize(FunctionContext* ctx,
- const StringVal& src_val) {
- if (UNLIKELY(src_val.is_null)) return src_val;
- DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
+ const StringVal& src) {
+ if (UNLIKELY(src.is_null)) return src;
- ReservoirSampleState<T>* src =
reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
- sort(src->samples, src->samples + src->num_samples, SampleValLess<T>);
+ ReservoirSampleState<T>* src_state =
+ reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
+ src_state->SortSamples();
stringstream out;
- int num_buckets = min(src->num_samples, NUM_BUCKETS);
- int samples_per_bucket = max(src->num_samples / NUM_BUCKETS, 1);
+ int num_buckets = min(src_state->num_samples(), NUM_BUCKETS);
+ int samples_per_bucket = max(src_state->num_samples() / NUM_BUCKETS, 1);
for (int bucket_idx = 0; bucket_idx < num_buckets; ++bucket_idx) {
int sample_idx = (bucket_idx + 1) * samples_per_bucket - 1;
- PrintSample<T>(src->samples[sample_idx], &out);
+ PrintSample<T>(*(src_state->GetSample(sample_idx)), &out);
if (bucket_idx < (num_buckets - 1)) out << ", ";
}
const string& out_str = out.str();
StringVal result_str = StringVal::CopyFrom(ctx,
reinterpret_cast<const uint8_t*>(out_str.c_str()), out_str.size());
- ctx->Free(src_val.ptr);
+ src_state->Delete(ctx);
return result_str;
}
template <typename T>
-T AggregateFunctions::AppxMedianFinalize(FunctionContext* ctx,
- const StringVal& src_val) {
- if (UNLIKELY(src_val.is_null)) return T::null();
- DCHECK_EQ(src_val.len, sizeof(ReservoirSampleState<T>));
-
- ReservoirSampleState<T>* src =
reinterpret_cast<ReservoirSampleState<T>*>(src_val.ptr);
- if (src->num_samples == 0) {
- ctx->Free(src_val.ptr);
- return T::null();
- }
- sort(src->samples, src->samples + src->num_samples, SampleValLess<T>);
-
- T result = src->samples[src->num_samples / 2].GetValue(ctx);
- ctx->Free(src_val.ptr);
+T AggregateFunctions::AppxMedianFinalize(FunctionContext* ctx, const
StringVal& src) {
+ if (UNLIKELY(src.is_null)) return T::null();
+ ReservoirSampleState<T>* src_state =
+ reinterpret_cast<ReservoirSampleState<T>*>(src.ptr);
+ T result = src_state->GetMedian(ctx);
+ src_state->Delete(ctx);
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/529a5f99/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
----------------------------------------------------------------------
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
index 675c75f..524c63b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test
@@ -1160,6 +1160,8 @@ select histogram(timestamp_col) from
functional.alltypestiny;
STRING
====
---- QUERY
+# IMPALA-4787: appx_median() on a medium sized dataset. This should excercise
merge() with
+# differently sized inputs in the Reservoir Sampling algorithm.
select
appx_median(bool_col),
appx_median(tinyint_col),
@@ -1169,13 +1171,24 @@ appx_median(float_col),
appx_median(double_col),
appx_median(string_col),
appx_median(timestamp_col)
-from alltypestiny
+from alltypes
---- RESULTS
-true,1,1,1,1.100000023841858,10.1,'1',2009-03-01 00:00:00
+true,5,5,5,5.5,50.5,'5',2010-01-01 00:00:00
---- TYPES
BOOLEAN, TINYINT, SMALLINT, INT, FLOAT, DOUBLE, STRING, TIMESTAMP
====
---- QUERY
+# IMPALA-4787: appx_median on a large dataset. This requires several buffer
resizes in the
+# Reservoir Sampling algorithm.
+select appx_median(l_returnflag)
+from tpch.lineitem
+where l_returnflag = "N"
+---- RESULTS
+'N'
+---- TYPES
+STRING
+====
+---- QUERY
# IMPALA-1419: Agg fn containing arithmetic expr on NULL fails
select count(null * 1) from functional.alltypes
---- RESULTS
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/529a5f99/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test
----------------------------------------------------------------------
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test
b/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test
index 338f8bd..eb4646c 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/alloc-fail-init.test
@@ -33,7 +33,7 @@ FunctionContext::Allocate() failed to allocate 16 bytes.
---- QUERY
select sample(timestamp_col) from functional.alltypes
---- CATCH
-FunctionContext::Allocate() failed to allocate 480232 bytes.
+FunctionContext::Allocate() failed to allocate 248 bytes.
====
---- QUERY
select distinctpc(int_col) from functional.alltypes
@@ -93,5 +93,5 @@ FunctionContextImpl::AllocateLocal() failed to allocate 120
bytes.
---- QUERY
select appx_median(int_col) from functional.alltypes
---- CATCH
-FunctionContext::Allocate() failed to allocate 320232 bytes.
+FunctionContext::Allocate() failed to allocate 248 bytes.
====