This is an automated email from the ASF dual-hosted git repository.
amansinha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new fd331da DRILL-7117: Support creation of equi-depth histogram for
selected data types.
fd331da is described below
commit fd331da3872534b284ebd54fec0ff11f7bae47a5
Author: Aman Sinha <[email protected]>
AuthorDate: Wed Feb 13 15:29:04 2019 -0800
DRILL-7117: Support creation of equi-depth histogram for selected data
types.
Support int/bigint/float4/float8, time/timestamp/date and boolean.
Build the histogram from the t-digest byte array and serialize as JSON
string.
More changes for serialization/deserialization.
Add code-gen stubs (empty) for VarChar/VarBinary types.
Address review comments (part 1). Add unit test.
Address review comments (part 2) for sampling.
close apache/drill#1715
---
.../java/org/apache/drill/exec/ExecConstants.java | 8 +
.../drill/exec/expr/fn/impl/TDigestFunctions.java | 1082 ++++++++++++++++++++
.../impl/statistics/MergedStatisticFactory.java | 1 +
.../exec/physical/impl/statistics/Statistic.java | 2 +
.../impl/statistics/StatisticsMergeBatch.java | 5 +-
.../impl/statistics/TDigestMergedStatistic.java | 130 +++
.../drill/exec/planner/common/DrillStatsTable.java | 47 +-
.../drill/exec/planner/common/Histogram.java | 41 +
.../drill/exec/planner/common/HistogramUtils.java | 53 +
.../planner/common/NumericEquiDepthHistogram.java | 114 +++
.../drill/exec/planner/physical/AnalyzePrule.java | 7 +-
.../exec/server/options/SystemOptionManager.java | 3 +-
.../easy/json/JsonStatisticsRecordWriter.java | 6 +-
.../java-exec/src/main/resources/drill-module.conf | 3 +-
.../org/apache/drill/exec/sql/TestAnalyze.java | 61 ++
exec/jdbc-all/pom.xml | 1 +
16 files changed, 1557 insertions(+), 7 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 6d38045..3af8744 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -979,4 +979,12 @@ public final class ExecConstants {
public static final String NDV_BLOOM_FILTER_FPOS_PROB =
"exec.statistics.ndv_extrapolation_bf_fpprobability";
public static final LongValidator NDV_BLOOM_FILTER_FPOS_PROB_VALIDATOR = new
PositiveLongValidator(NDV_BLOOM_FILTER_FPOS_PROB,
100, new OptionDescription("Controls trade-off between NDV statistic
computation memory cost and sampling extrapolation accuracy"));
+
+ /**
+ * Controls the 'compression' factor for the TDigest algorithm.
+ */
+ public static final String TDIGEST_COMPRESSION =
"exec.statistics.tdigest_compression";
+ public static final LongValidator TDIGEST_COMPRESSION_VALIDATOR = new
PositiveLongValidator(TDIGEST_COMPRESSION, 10000,
+ new OptionDescription("Controls trade-off between t-digest quantile
statistic storage cost and accuracy. " +
+ "Higher values use more groups (clusters) for the t-digest and improve
accuracy at the expense of extra storage. "));
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/TDigestFunctions.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/TDigestFunctions.java
new file mode 100644
index 0000000..3be70c2
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/TDigestFunctions.java
@@ -0,0 +1,1082 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.impl;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.expr.DrillAggFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.ObjectHolder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.server.options.OptionManager;
+
+import javax.inject.Inject;
+
+@SuppressWarnings("unused")
+public class TDigestFunctions {
+ static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(TDigestFunctions.class);
+
+ private TDigestFunctions(){}
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class BigIntTDigestFunction implements DrillAggFunc {
+ @Param BigIntHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ tdigest.add(in.value);
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class NullableBigIntTDigestFunction implements DrillAggFunc {
+ @Param NullableBigIntHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ if (in.isSet == 1) {
+ tdigest.add(in.value);
+ } else {
+ // do nothing since we track nulls outside the scope of the histogram
+ }
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class IntTDigestFunction implements DrillAggFunc {
+ @Param IntHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ tdigest.add(in.value);
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class NullableIntTDigestFunction implements DrillAggFunc {
+ @Param NullableIntHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ if (in.isSet == 1) {
+ tdigest.add(in.value);
+ } else {
+ // do nothing since we track nulls outside the scope of the histogram
+ }
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class Float8TDigestFunction implements DrillAggFunc {
+ @Param Float8Holder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ tdigest.add(in.value);
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class NullableFloat8TDigestFunction implements DrillAggFunc {
+ @Param NullableFloat8Holder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ if (in.isSet == 1) {
+ tdigest.add(in.value);
+ } else {
+ // do nothing since we track nulls outside the scope of the histogram
+ }
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class Float4TDigestFunction implements DrillAggFunc {
+ @Param Float4Holder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ tdigest.add(in.value);
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class NullableFloat4TDigestFunction implements DrillAggFunc {
+ @Param NullableFloat4Holder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ if (in.isSet == 1) {
+ tdigest.add(in.value);
+ } else {
+ // do nothing since we track nulls outside the scope of the histogram
+ }
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class BitTDigestFunction implements DrillAggFunc {
+ @Param BitHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ tdigest.add(in.value);
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class NullableBitTDigestFunction implements DrillAggFunc {
+ @Param NullableBitHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ if (in.isSet == 1) {
+ tdigest.add(in.value);
+ } else {
+ // do nothing since we track nulls outside the scope of the histogram
+ }
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class DateTDigestFunction implements DrillAggFunc {
+ @Param DateHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ tdigest.add(in.value);
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class NullableDateTDigestFunction implements DrillAggFunc {
+ @Param NullableDateHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ if (in.isSet == 1) {
+ tdigest.add(in.value);
+ } else {
+ // do nothing since we track nulls outside the scope of the histogram
+ }
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class TimeTDigestFunction implements DrillAggFunc {
+ @Param TimeHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ tdigest.add(in.value);
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class NullableTimeTDigestFunction implements DrillAggFunc {
+ @Param NullableTimeHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ if (in.isSet == 1) {
+ tdigest.add(in.value);
+ } else {
+ // do nothing since we track nulls outside the scope of the histogram
+ }
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class TimeStampTDigestFunction implements DrillAggFunc {
+ @Param TimeStampHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ tdigest.add(in.value);
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class NullableTimeStampTDigestFunction implements DrillAggFunc
{
+ @Param NullableTimeStampHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ if (in.isSet == 1) {
+ tdigest.add(in.value);
+ } else {
+ // do nothing since we track nulls outside the scope of the histogram
+ }
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class VarCharTDigestFunction implements DrillAggFunc {
+ @Param VarCharHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+
+ }
+
+ @Override
+ public void add() {
+
+ }
+
+ @Override
+ public void output() {
+ }
+
+ @Override
+ public void reset() {
+
+ }
+ }
+
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class NullableVarCharTDigestFunction implements DrillAggFunc {
+ @Param NullableVarCharHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+
+ }
+
+ @Override
+ public void add() {
+
+ }
+
+ @Override
+ public void output() {
+ }
+
+ @Override
+ public void reset() {
+
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class VarBinaryTDigestFunction implements DrillAggFunc {
+ @Param VarBinaryHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+
+ }
+
+ @Override
+ public void add() {
+
+ }
+
+ @Override
+ public void output() {
+ }
+
+ @Override
+ public void reset() {
+
+ }
+ }
+
+
+ @FunctionTemplate(name = "tdigest", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class NullableVarBinaryTDigestFunction implements DrillAggFunc
{
+ @Param NullableVarBinaryHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+
+ }
+
+ @Override
+ public void add() {
+
+ }
+
+ @Override
+ public void output() {
+ }
+
+ @Override
+ public void reset() {
+
+ }
+ }
+
+ @FunctionTemplate(name = "tdigest_merge", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+ public static class TDigestMergeFunction implements DrillAggFunc {
+ @Param NullableVarBinaryHolder in;
+ @Workspace ObjectHolder work;
+ @Output NullableVarBinaryHolder out;
+ @Inject DrillBuf buffer;
+ @Inject OptionManager options;
+ @Workspace IntHolder compression;
+
+ @Override
+ public void setup() {
+ work = new ObjectHolder();
+ compression.value = (int)
options.getLong(org.apache.drill.exec.ExecConstants.TDIGEST_COMPRESSION);
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+
+ @Override
+ public void add() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ if (in.isSet != 0) {
+ byte[] buf =
org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(in.start,
in.end, in.buffer).getBytes();
+ com.clearspring.analytics.stream.quantile.TDigest other =
+
com.clearspring.analytics.stream.quantile.TDigest.fromBytes(java.nio.ByteBuffer.wrap(buf));
+ tdigest.add(other);
+ }
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to merge
TDigest output", e);
+ }
+ }
+ }
+
+ @Override
+ public void output() {
+ if (work.obj != null) {
+ com.clearspring.analytics.stream.quantile.TDigest tdigest =
(com.clearspring.analytics.stream.quantile.TDigest) work.obj;
+ try {
+ int size = tdigest.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ tdigest.asSmallBytes(byteBuf);
+ out.buffer = buffer.reallocIfNeeded(size);
+ out.start = 0;
+ out.end = size;
+ out.buffer.setBytes(0, byteBuf.array());
+ out.isSet = 1;
+ } catch (Exception e) {
+ throw new
org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get TDigest
output", e);
+ }
+ } else {
+ out.isSet = 0;
+ }
+ }
+
+ @Override
+ public void reset() {
+ work.obj = new
com.clearspring.analytics.stream.quantile.TDigest(compression.value);
+ }
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/MergedStatisticFactory.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/MergedStatisticFactory.java
index c921394..7ffd0ae 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/MergedStatisticFactory.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/MergedStatisticFactory.java
@@ -35,6 +35,7 @@ public class MergedStatisticFactory {
statsClasses.put(Statistic.HLL_MERGE, HLLMergedStatistic.class);
statsClasses.put(Statistic.NDV, NDVMergedStatistic.class);
statsClasses.put(Statistic.SUM_DUPS, CntDupsMergedStatistic.class);
+ statsClasses.put(Statistic.TDIGEST_MERGE, TDigestMergedStatistic.class);
}
private MergedStatistic newMergedStatistic(String outputStatName)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/Statistic.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/Statistic.java
index b9f905d..8f5392a 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/Statistic.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/Statistic.java
@@ -41,4 +41,6 @@ public abstract class Statistic {
public static final String SUM_WIDTH = "sum_width";
public static final String CNT_DUPS = "approx_count_dups";
public static final String SUM_DUPS = "sum";
+ public static final String TDIGEST = "tdigest";
+ public static final String TDIGEST_MERGE = "tdigest_merge";
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
index 95982b7..acf167b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/StatisticsMergeBatch.java
@@ -209,6 +209,8 @@ public class StatisticsMergeBatch extends
AbstractSingleRecordBatch<StatisticsMe
((CntDupsMergedStatistic)statistic).configure(mergedStatisticList);
} else if (statistic.getName().equals(Statistic.HLL_MERGE)) {
((HLLMergedStatistic)statistic).configure(context.getOptions());
+ } else if (statistic.getName().equals(Statistic.TDIGEST_MERGE)) {
+ ((TDigestMergedStatistic)statistic).configure(context.getOptions());
}
}
// Create the schema number and time when computed in the outgoing vector
@@ -259,7 +261,8 @@ public class StatisticsMergeBatch extends
AbstractSingleRecordBatch<StatisticsMe
TypeProtos.MinorType minorType;
if (outStatName.equals(Statistic.AVG_WIDTH)) {
minorType = TypeProtos.MinorType.FLOAT8;
- } else if (outStatName.equals(Statistic.HLL_MERGE)) {
+ } else if (outStatName.equals(Statistic.HLL_MERGE) ||
+ outStatName.equals(Statistic.TDIGEST_MERGE)) {
minorType = TypeProtos.MinorType.VARBINARY;
} else {
minorType = TypeProtos.MinorType.BIGINT;
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/TDigestMergedStatistic.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/TDigestMergedStatistic.java
new file mode 100644
index 0000000..cccbff6
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/statistics/TDigestMergedStatistic.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.statistics;
+
+// Library implementing TDigest algorithm to derive approximate quantiles.
Please refer to:
+// 'Computing Extremely Accurate Quantiles using t-Digests' by Ted Dunning and
Otmar Ertl
+
+import com.clearspring.analytics.stream.quantile.TDigest;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.nio.ByteBuffer;
+
+public class TDigestMergedStatistic extends AbstractMergedStatistic {
+ private Map<String, TDigest> tdigestHolder;
+ private int compression;
+
+ public TDigestMergedStatistic() {
+ this.tdigestHolder = new HashMap<>();
+ state = State.INIT;
+ }
+
+ @Override
+ public void initialize(String inputName, double samplePercent) {
+ super.initialize(Statistic.TDIGEST_MERGE, inputName, samplePercent);
+ state = State.CONFIG;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public String getInput() {
+ return inputName;
+ }
+
+ @Override
+ public void merge(MapVector input) {
+ // Check the input is a Map Vector
+ assert (input.getField().getType().getMinorType() ==
TypeProtos.MinorType.MAP);
+ for (ValueVector vv : input) {
+ String colName = vv.getField().getName();
+ TDigest colTdigestHolder = null;
+ if (tdigestHolder.get(colName) != null) {
+ colTdigestHolder = tdigestHolder.get(colName);
+ }
+ NullableVarBinaryVector tdigestVector = (NullableVarBinaryVector) vv;
+ NullableVarBinaryVector.Accessor accessor = tdigestVector.getAccessor();
+
+ if (!accessor.isNull(0)) {
+ TDigest other = TDigest.fromBytes(ByteBuffer.wrap(accessor.get(0)));
+ if (colTdigestHolder != null) {
+ colTdigestHolder.add(other);
+ tdigestHolder.put(colName, colTdigestHolder);
+ } else {
+ tdigestHolder.put(colName, other);
+ }
+ }
+ }
+ }
+
+ public TDigest getStat(String colName) {
+ if (state != State.COMPLETE) {
+ throw new IllegalStateException(String.format("Statistic `%s` has not
completed merging statistics",
+ name));
+ }
+ return tdigestHolder.get(colName);
+ }
+
+ @Override
+ public void setOutput(MapVector output) {
+ // Check the input is a Map Vector
+ assert (output.getField().getType().getMinorType() ==
TypeProtos.MinorType.MAP);
+ // Dependencies have been configured correctly
+ assert (state == State.MERGE);
+ for (ValueVector outMapCol : output) {
+ String colName = outMapCol.getField().getName();
+ TDigest colTdigestHolder = tdigestHolder.get(colName);
+ NullableVarBinaryVector vv = (NullableVarBinaryVector) outMapCol;
+ vv.allocateNewSafe();
+ try {
+ if (colTdigestHolder != null) {
+ int size = colTdigestHolder.smallByteSize();
+ java.nio.ByteBuffer byteBuf = java.nio.ByteBuffer.allocate(size);
+ colTdigestHolder.asSmallBytes(byteBuf);
+ // NOTE: in setting the VV below, we are using the byte[] instead of
the ByteBuffer because the
+ // latter was producing incorrect output (after re-reading the data
from the VV). It is
+ // unclear whether the setSafe() api for ByteBuffer has a bug, so to
be safe we are using the
+ // byte[] directly which works.
+ vv.getMutator().setSafe(0, byteBuf.array(), 0,
byteBuf.array().length);
+ } else {
+ vv.getMutator().setNull(0);
+ }
+ } catch (Exception ex) {
+ // TODO: logger
+ }
+ }
+ state = State.COMPLETE;
+ }
+
+ public void configure(OptionManager optionsManager) {
+ assert (state == State.CONFIG);
+ compression = (int)
optionsManager.getLong(ExecConstants.TDIGEST_COMPRESSION);
+ // Now config complete - moving to MERGE state
+ state = State.MERGE;
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
index 7e030da..24cb888 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java
@@ -63,12 +63,16 @@ public class DrillStatsTable {
public enum STATS_VERSION {V0, V1};
// The current version
public static final STATS_VERSION CURRENT_VERSION = STATS_VERSION.V1;
+ // 10 histogram buckets (TODO: can make this configurable later)
+ public static final int NUM_HISTOGRAM_BUCKETS = 10;
+
private final FileSystem fs;
private final Path tablePath;
private final String schemaName;
private final String tableName;
- private double rowCount = -1;
private final Map<String, Long> ndv = Maps.newHashMap();
+ private final Map<String, Histogram> histogram = Maps.newHashMap();
+ private double rowCount = -1;
private final Map<String, Long> nnRowCount = Maps.newHashMap();
private boolean materialized = false;
private TableStatistics statistics = null;
@@ -163,6 +167,30 @@ public class DrillStatsTable {
}
/**
+ * Get the histogram of a given column. If stats are not present for the
given column,
+ * a null is returned.
+ *
+ * Note: returned data may not be accurate. Accuracy depends on whether the
table data has changed after the
+ * stats are computed.
+ *
+ * @param col
+ * @return Histogram for this column
+ */
+ public Histogram getHistogram(String col) {
+ // Stats might not have materialized because of errors.
+ if (!materialized) {
+ return null;
+ }
+ final String upperCol = col.toUpperCase();
+ Histogram histogramCol = histogram.get(upperCol);
+ if (histogramCol == null) {
+ histogramCol =
histogram.get(SchemaPath.getSimplePath(upperCol).toString());
+ }
+ return histogramCol;
+ }
+
+
+ /**
* Read the stats from storage and keep them in memory.
* @param table - Drill table for which we require stats
* @param context - Query context
@@ -184,6 +212,10 @@ public class DrillStatsTable {
ndv.put(cs.getName().toUpperCase(), cs.getNdv());
nnRowCount.put(cs.getName().toUpperCase(),
(long)cs.getNonNullCount());
rowCount = Math.max(rowCount, cs.getCount());
+
+ // get the histogram for this column
+ Histogram hist = cs.getHistogram();
+ histogram.put(cs.getName(), hist);
}
}
}
@@ -330,6 +362,7 @@ public class DrillStatsTable {
@JsonProperty ("nonnullrowcount") private long nonNullCount = 0;
@JsonProperty ("ndv") private long ndv = 0;
@JsonProperty ("avgwidth") private double width = 0;
+ @JsonProperty ("histogram") private Histogram histogram = null;
public ColumnStatistics_v1() {}
@JsonGetter ("column")
@@ -380,6 +413,18 @@ public class DrillStatsTable {
}
@JsonSetter ("avgwidth")
public void setAvgWidth(double width) { this.width = width; }
+ @JsonGetter("histogram")
+ public Histogram getHistogram() { return this.histogram; }
+ @JsonSetter("histogram")
+ public void setHistogram(Histogram histogram) {
+ this.histogram = histogram;
+ }
+ @JsonIgnore
+ public void buildHistogram(byte[] tdigest_bytearray) {
+ int num_buckets = (int) Math.min(ndv, (long)
DrillStatsTable.NUM_HISTOGRAM_BUCKETS);
+ this.histogram =
HistogramUtils.buildHistogramFromTDigest(tdigest_bytearray, this.getType(),
+ num_buckets, nonNullCount);
+ }
}
private TableStatistics readStatistics(DrillTable drillTable, Path path)
throws IOException {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/Histogram.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/Histogram.java
new file mode 100644
index 0000000..1980ee5
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/Histogram.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * A column specific histogram
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY,
+ property = "category")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = NumericEquiDepthHistogram.class,
name="numeric-equi-depth")
+})
+public interface Histogram {
+
+ /**
+ * For a filter condition, estimate the selectivity (matching rows/total
rows) for this histogram
+ * @param filter
+ * @return estimated selectivity or NULL if it could not be estimated for
any reason
+ */
+ Double estimatedSelectivity(RexNode filter);
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/HistogramUtils.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/HistogramUtils.java
new file mode 100644
index 0000000..f19b259
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/HistogramUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+import org.apache.drill.common.types.TypeProtos;
+
+public class HistogramUtils {
+
+ /**
+ * Build a histogram using the t-digest byte array.
+ * The type of histogram is dependent on the data type of the column.
+ */
+ public static Histogram buildHistogramFromTDigest(byte[] tdigest_bytearray,
+ TypeProtos.MajorType type,
+ int numBuckets,
+ long nonNullCount) {
+ Histogram histogram = null;
+ if (type != null && type.hasMinorType()) {
+ switch (type.getMinorType()) {
+ case INT:
+ case BIGINT:
+ case FLOAT4:
+ case FLOAT8:
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ case BIT:
+ histogram =
NumericEquiDepthHistogram.buildFromTDigest(tdigest_bytearray, numBuckets,
nonNullCount);
+ break;
+ default:
+ // TODO: support other data types
+ break;
+ }
+ }
+ return histogram;
+ }
+
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/NumericEquiDepthHistogram.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/NumericEquiDepthHistogram.java
new file mode 100644
index 0000000..386141e
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/NumericEquiDepthHistogram.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.common;
+
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import org.apache.calcite.rex.RexNode;
+import com.clearspring.analytics.stream.quantile.TDigest;
+
+/**
+ * A column specific equi-depth histogram which is meant for numeric data types
+ */
+@JsonTypeName("numeric-equi-depth")
+public class NumericEquiDepthHistogram implements Histogram {
+
+ // For equi-depth, all buckets will have same (approx) number of rows
+ @JsonProperty("numRowsPerBucket")
+ private long numRowsPerBucket;
+
+ // An array of buckets arranged in increasing order of their start boundaries
+ // Note that the buckets only maintain the start point of the bucket range.
+ // End point is assumed to be the same as the start point of next bucket,
although
+ // when evaluating the filter selectivity we should treat the interval as
[start, end)
+ // i.e closed on the start and open on the end
+ @JsonProperty("buckets")
+ private Double[] buckets;
+
+ // Default constructor for deserializer
+ public NumericEquiDepthHistogram() {}
+
+ public NumericEquiDepthHistogram(int numBuckets) {
+ // If numBuckets = N, we are keeping N + 1 entries since the (N+1)th
bucket's
+ // starting value is the MAX value for the column and it becomes the end
point of the
+ // Nth bucket.
+ buckets = new Double[numBuckets + 1];
+ for (int i = 0; i < buckets.length; i++) {
+ buckets[i] = new Double(0.0);
+ }
+ numRowsPerBucket = -1;
+ }
+
+ public long getNumRowsPerBucket() {
+ return numRowsPerBucket;
+ }
+
+ public void setNumRowsPerBucket(long numRows) {
+ this.numRowsPerBucket = numRows;
+ }
+
+ public Double[] getBuckets() {
+ return buckets;
+ }
+
+ @Override
+ public Double estimatedSelectivity(RexNode filter) {
+ if (numRowsPerBucket >= 0) {
+ return 1.0;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Utility method to build a Numeric Equi-Depth Histogram from a t-digest
byte array
+ * @param tdigest_array
+ * @return An instance of NumericEquiDepthHistogram
+ */
+ public static NumericEquiDepthHistogram buildFromTDigest(byte[]
tdigest_array,
+ int numBuckets,
+ long nonNullCount) {
+ TDigest tdigest =
TDigest.fromBytes(java.nio.ByteBuffer.wrap(tdigest_array));
+
+ NumericEquiDepthHistogram histogram = new
NumericEquiDepthHistogram(numBuckets);
+
+ double q = 1.0/numBuckets;
+ int i = 0;
+ for (; i < numBuckets; i++) {
+ // get the starting point of the i-th quantile
+ double start = tdigest.quantile(q * i);
+ histogram.buckets[i] = start;
+ }
+ // for the N-th bucket, the end point corresponds to the 1.0 quantile but
we don't keep the end
+ // points; only the start point, so this is stored as the start point of
the (N+1)th bucket
+ histogram.buckets[i] = tdigest.quantile(1.0);
+
+ // Each bucket stores approx equal number of rows. Here, we take into
consideration the nonNullCount
+ // supplied since the stats may have been collected with sampling.
Sampling of 20% means only 20% of the
+ // tuples will be stored in the t-digest. However, the overall stats such
as totalRowCount, nonNullCount and
+ // NDV would have already been extrapolated up from the sample. So, we
take the max of the t-digest size and
+ // the supplied nonNullCount. Why non-null ? Because only non-null values
are stored in the t-digest.
+ long numRowsPerBucket = (Math.max(tdigest.size(),
nonNullCount))/numBuckets;
+ histogram.setNumRowsPerBucket(numRowsPerBucket);
+
+ return histogram;
+ }
+
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AnalyzePrule.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AnalyzePrule.java
index cf41cf8..86bed95 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AnalyzePrule.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AnalyzePrule.java
@@ -47,7 +47,8 @@ public class AnalyzePrule extends Prule {
Statistic.NNROWCOUNT, // total number of non-null entries in table
fragment
Statistic.SUM_WIDTH, // total column width across all entries in table
fragment
Statistic.CNT_DUPS, // total count of non-singletons in table fragment
- Statistic.HLL // total distinct values in table fragment
+ Statistic.HLL, // total distinct values in table fragment
+ Statistic.TDIGEST // quantile distribution of values in table
fragment
);
// Mapping between output functions (from StatsMergeBatch) and
@@ -60,6 +61,7 @@ public class AnalyzePrule extends Prule {
PHASE_2_FUNCTIONS.put(Statistic.SUM_DUPS, Statistic.CNT_DUPS);
PHASE_2_FUNCTIONS.put(Statistic.HLL_MERGE, Statistic.HLL);
PHASE_2_FUNCTIONS.put(Statistic.NDV, Statistic.HLL);
+ PHASE_2_FUNCTIONS.put(Statistic.TDIGEST_MERGE, Statistic.TDIGEST);
}
// List of input functions (from StatsMergeBatch) to UnpivotMapsBatch
@@ -69,7 +71,8 @@ public class AnalyzePrule extends Prule {
Statistic.AVG_WIDTH, // average column width across all entries in the
table
Statistic.HLL_MERGE, // total distinct values(computed using hll) in
the table
Statistic.SUM_DUPS, // total count of duplicate values across all
entries in the table
- Statistic.NDV // total distinct values across all entries in
the table
+ Statistic.NDV, // total distinct values across all entries in
the table
+ Statistic.TDIGEST_MERGE // quantile distribution of all values in the
table
);
public AnalyzePrule() {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 201d991..3e7465d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -283,7 +283,8 @@ public class SystemOptionManager extends BaseOptionManager
implements AutoClosea
new OptionDefinition(ExecConstants.NDV_BLOOM_FILTER_FPOS_PROB_VALIDATOR),
new OptionDefinition(ExecConstants.RM_QUERY_TAGS_VALIDATOR,
new OptionMetaData(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
false, false)),
- new
OptionDefinition(ExecConstants.RM_QUEUES_WAIT_FOR_PREFERRED_NODES_VALIDATOR)
+ new
OptionDefinition(ExecConstants.RM_QUEUES_WAIT_FOR_PREFERRED_NODES_VALIDATOR),
+ new OptionDefinition(ExecConstants.TDIGEST_COMPRESSION_VALIDATOR)
};
CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java
index 6fb8667..da2f70c 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonStatisticsRecordWriter.java
@@ -346,7 +346,8 @@ public class JsonStatisticsRecordWriter extends
JSONBaseStatisticsRecordWriter {
public void startField() throws IOException {
if (!skipNullFields || this.reader.isSet()) {
if (fieldName.equals(Statistic.HLL)
- || fieldName.equals(Statistic.HLL_MERGE)) {
+ || fieldName.equals(Statistic.HLL_MERGE)
+ || fieldName.equals(Statistic.TDIGEST_MERGE)) {
nextField = fieldName;
}
}
@@ -363,6 +364,9 @@ public class JsonStatisticsRecordWriter extends
JSONBaseStatisticsRecordWriter {
|| nextField.equals(Statistic.HLL_MERGE)) {
// Do NOT write out the HLL output, since it is not used yet for
computing statistics for a
// subset of partitions in the query OR for computing NDV with
incremental statistics.
+ } else if (nextField.equals(Statistic.TDIGEST_MERGE)) {
+ byte[] tdigest_bytearray = reader.readByteArray();
+ ((DrillStatsTable.ColumnStatistics_v1)
columnStatistics).buildHistogram(tdigest_bytearray);
}
}
}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf
b/exec/java-exec/src/main/resources/drill-module.conf
index c711551..e79a9ce 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -685,5 +685,6 @@ drill.exec.options: {
exec.query.return_result_set_for_ddl: true,
# ========= rm related options ===========
exec.rm.queryTags: "",
- exec.rm.queues.wait_for_preferred_nodes: true
+ exec.rm.queues.wait_for_preferred_nodes: true,
+ exec.statistics.tdigest_compression: 100
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
index 04e9b99..b32aa21 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java
@@ -368,6 +368,67 @@ public class TestAnalyze extends BaseTestQuery {
test("DROP TABLE dfs.tmp.`parquetStale/1996/Q5`");
}
+ // Test basic histogram creation functionality for int, bigint, double,
date, timestamp and boolean data types.
+ // Test that varchar column does not fail the query but generates empty
buckets.
+ // Use Repeated_Count for checking number of entries, but currently we don't
check actual contents of the
+ // buckets since that requires enforcing a repeatable t-digest quantile that
is used by histogram and is future work.
+ @Test
+ public void testHistogramWithDataTypes1() throws Exception {
+ try {
+ test("ALTER SESSION SET `planner.slice_target` = 1");
+ test("ALTER SESSION SET `store.format` = 'parquet'");
+ test("CREATE TABLE dfs.tmp.employee1 AS SELECT employee_id, full_name, "
+ + "case when gender = 'M' then cast(1 as boolean) else cast(0 as
boolean) end as is_male, "
+ + " cast(store_id as int) as store_id, cast(department_id as
bigint) as department_id, "
+ + " cast(birth_date as date) as birth_date, cast(hire_date as
timestamp) as hire_date_and_time, "
+ + " cast(salary as double) as salary from cp.`employee.json`
where department_id > 10");
+ test("ANALYZE TABLE dfs.tmp.employee1 COMPUTE STATISTICS");
+
+ testBuilder()
+ .sqlQuery("SELECT tbl.`columns`.`column` as `column`, "
+ + " repeated_count(tbl.`columns`.`histogram`.`buckets`)
as num_bucket_entries "
+ + " from (select flatten(`directories`[0].`columns`) as
`columns` "
+ + " from dfs.tmp.`employee1/.stats.drill`) as tbl")
+ .unOrdered()
+ .baselineColumns("column", "num_bucket_entries")
+ .baselineValues("`employee_id`", 11)
+ .baselineValues("`full_name`", 0)
+ .baselineValues("`is_male`", 3)
+ .baselineValues("`store_id`", 11)
+ .baselineValues("`department_id`", 8)
+ .baselineValues("`birth_date`", 11)
+ .baselineValues("`hire_date_and_time`", 7)
+ .baselineValues("`salary`", 11)
+ .go();
+ } finally {
+ test("ALTER SESSION SET `planner.slice_target` = " +
ExecConstants.SLICE_TARGET_DEFAULT);
+ }
+ }
+
+ @Test
+ public void testHistogramWithSubsetColumnsAndSampling() throws Exception {
+ try {
+ test("ALTER SESSION SET `planner.slice_target` = 1");
+ test("ALTER SESSION SET `store.format` = 'parquet'");
+ test("CREATE TABLE dfs.tmp.customer1 AS SELECT * from
cp.`tpch/customer.parquet`");
+ test("ANALYZE TABLE dfs.tmp.customer1 COMPUTE STATISTICS (c_custkey,
c_nationkey, c_acctbal) SAMPLE 55 PERCENT");
+
+ testBuilder()
+ .sqlQuery("SELECT tbl.`columns`.`column` as `column`, "
+ + " repeated_count(tbl.`columns`.`histogram`.`buckets`)
as num_bucket_entries "
+ + " from (select flatten(`directories`[0].`columns`) as
`columns` "
+ + " from dfs.tmp.`customer1/.stats.drill`) as tbl")
+ .unOrdered()
+ .baselineColumns("column", "num_bucket_entries")
+ .baselineValues("`c_custkey`", 11)
+ .baselineValues("`c_nationkey`", 11)
+ .baselineValues("`c_acctbal`", 11)
+ .go();
+ } finally {
+ test("ALTER SESSION SET `planner.slice_target` = " +
ExecConstants.SLICE_TARGET_DEFAULT);
+ }
+ }
+
//Helper function to verify output of ANALYZE statement
private void verifyAnalyzeOutput(String query, String message) throws
Exception {
List<QueryDataBatch>result = testRunAndReturn(QueryType.SQL, query);
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 64a3833..2e3d5ae 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -452,6 +452,7 @@
<exclude>org/apache/drill/shaded/guava/com/google/common/graph/**</exclude>
<exclude>org/apache/drill/shaded/guava/com/google/common/collect/Tree*</exclude>
<exclude>org/apache/drill/shaded/guava/com/google/common/collect/Standard*</exclude>
+
<exclude>org/apache/drill/shaded/guava/com/google/common/io/BaseEncoding*</exclude>
<exclude>com/google/common/math</exclude>
<exclude>com/google/common/net</exclude>
<exclude>com/google/common/primitives</exclude>