This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e90d00  Configurable maxStreamLength for doubles sketches (#11574)
7e90d00 is described below

commit 7e90d00cc0e3e9a01d1a3a7730f5bbf6d276999b
Author: Jihoon Son <[email protected]>
AuthorDate: Tue Aug 31 14:56:37 2021 -0700

    Configurable maxStreamLength for doubles sketches (#11574)
    
    * Configurable maxStreamLength for doubles sketches
    
    * fix equals/hashcode and it test failure
    
    * fix test
    
    * fix it test
    
    * benchmark
    
    * doc
    
    * grouping key
    
    * fix comment
    
    * dependency check
    
    * Update docs/development/extensions-core/datasketches-quantiles.md
    
    Co-authored-by: Charles Smith <[email protected]>
    
    * Update docs/querying/sql.md
    
    Co-authored-by: Charles Smith <[email protected]>
    
    * Update docs/querying/sql.md
    
    Co-authored-by: Charles Smith <[email protected]>
    
    * Update docs/querying/sql.md
    
    Co-authored-by: Charles Smith <[email protected]>
    
    * Update docs/querying/sql.md
    
    Co-authored-by: Charles Smith <[email protected]>
    
    * Update docs/querying/sql.md
    
    Co-authored-by: Charles Smith <[email protected]>
    
    * Update docs/querying/sql.md
    
    Co-authored-by: Charles Smith <[email protected]>
    
    * Update docs/querying/sql.md
    
    Co-authored-by: Charles Smith <[email protected]>
    
    Co-authored-by: Charles Smith <[email protected]>
---
 benchmarks/pom.xml                                 |   5 +
 .../apache/druid/benchmark/query/SqlBenchmark.java |  34 +++++-
 .../extensions-core/datasketches-quantiles.md      |   1 +
 docs/querying/sql.md                               |  14 ++-
 .../quantiles/DoublesSketchAggregatorFactory.java  |  70 +++++++----
 .../DoublesSketchBuildBufferAggregator.java        |   2 +-
 .../DoublesSketchBuildVectorAggregator.java        |  24 ++--
 .../quantiles/DoublesSketchMergeAggregator.java    |  12 +-
 .../DoublesSketchMergeAggregatorFactory.java       |  18 ++-
 .../DoublesSketchMergeVectorAggregator.java        |  28 +++--
 .../datasketches/quantiles/DoublesSketches.java    |  53 ++++++++
 .../DoublesSketchApproxQuantileSqlAggregator.java  |  17 ++-
 .../sql/DoublesSketchObjectSqlAggregator.java      |   6 +-
 .../DoublesSketchAggregatorFactoryTest.java        |  70 +++++++++++
 .../quantiles/DoublesSketchAggregatorTest.java     | 136 ++++++++++++++++++++-
 .../DoublesSketchMergeAggregatorFactoryTest.java   |  63 ++++++++++
 .../sql/DoublesSketchSqlAggregatorTest.java        |  39 ++++++
 .../coordinator/duty/ITAutoCompactionTest.java     |  12 +-
 .../druid/sql/calcite/BaseCalciteQueryTest.java    |  78 ++++++++++++
 website/.spelling                                  |   1 +
 20 files changed, 615 insertions(+), 68 deletions(-)

diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 3896a8e..b33df90 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -171,6 +171,11 @@
       <artifactId>datasketches-memory</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
index 8ffed2f..7d9a55d 100644
--- 
a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
+++ 
b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
@@ -29,13 +29,19 @@ import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import 
org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchApproxQuantileSqlAggregator;
+import 
org.apache.druid.query.aggregation.datasketches.quantiles.sql.DoublesSketchObjectSqlAggregator;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.generator.GeneratorBasicSchemas;
 import org.apache.druid.segment.generator.GeneratorSchemaInfo;
 import org.apache.druid.segment.generator.SegmentGenerator;
 import org.apache.druid.server.QueryStackTests;
 import org.apache.druid.server.security.AuthTestUtils;
+import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
+import org.apache.druid.sql.calcite.expression.SqlOperatorConversion;
+import 
org.apache.druid.sql.calcite.expression.builtin.QueryLookupOperatorConversion;
 import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.sql.calcite.planner.DruidOperatorTable;
 import org.apache.druid.sql.calcite.planner.DruidPlanner;
 import org.apache.druid.sql.calcite.planner.PlannerConfig;
 import org.apache.druid.sql.calcite.planner.PlannerFactory;
@@ -60,8 +66,10 @@ import org.openjdk.jmh.annotations.Warmup;
 import org.openjdk.jmh.infra.Blackhole;
 
 import javax.annotation.Nullable;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -365,7 +373,12 @@ public class SqlBenchmark
       + "    GROUP BY dimSequential\n"
       + "  )\n"
       + ")\n"
-      + "SELECT * FROM matrix"
+      + "SELECT * FROM matrix",
+
+      // 20: GroupBy, doubles sketches
+      "SELECT dimZipf, APPROX_QUANTILE_DS(sumFloatNormal, 0.5), 
DS_QUANTILES_SKETCH(maxLongUniform) "
+      + "FROM foo "
+      + "GROUP BY 1"
   );
 
   @Param({"5000000"})
@@ -374,7 +387,7 @@ public class SqlBenchmark
   @Param({"false", "force"})
   private String vectorize;
 
-  @Param({"10", "15"})
+  @Param({"20"})
   private String query;
 
   @Nullable
@@ -413,7 +426,7 @@ public class SqlBenchmark
     plannerFactory = new PlannerFactory(
         rootSchema,
         CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
-        CalciteTests.createOperatorTable(),
+        createOperatorTable(),
         CalciteTests.createExprMacroTable(),
         plannerConfig,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
@@ -422,6 +435,21 @@ public class SqlBenchmark
     );
   }
 
+  private static DruidOperatorTable createOperatorTable()
+  {
+    try {
+      final Set<SqlOperatorConversion> extractionOperators = new HashSet<>();
+      
extractionOperators.add(CalciteTests.INJECTOR.getInstance(QueryLookupOperatorConversion.class));
+      final Set<SqlAggregator> aggregators = new HashSet<>();
+      
aggregators.add(CalciteTests.INJECTOR.getInstance(DoublesSketchApproxQuantileSqlAggregator.class));
+      
aggregators.add(CalciteTests.INJECTOR.getInstance(DoublesSketchObjectSqlAggregator.class));
+      return new DruidOperatorTable(aggregators, extractionOperators);
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @TearDown(Level.Trial)
   public void tearDown() throws Exception
   {
diff --git a/docs/development/extensions-core/datasketches-quantiles.md 
b/docs/development/extensions-core/datasketches-quantiles.md
index bf9726e..850d848 100644
--- a/docs/development/extensions-core/datasketches-quantiles.md
+++ b/docs/development/extensions-core/datasketches-quantiles.md
@@ -56,6 +56,7 @@ The result of the aggregation is a DoublesSketch that is the 
union of all sketch
 |name|A String for the output (result) name of the calculation.|yes|
 |fieldName|A String for the name of the input field (can contain sketches or 
raw numeric values).|yes|
 |k|Parameter that determines the accuracy and size of the sketch. Higher k 
means higher accuracy but more space to store sketches. Must be a power of 2 
from 2 to 32768. See [accuracy 
information](https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch)
 in the DataSketches documentation for details.|no, defaults to 128|
+|maxStreamLength|This parameter is a temporary solution to avoid a [known 
issue](https://github.com/apache/druid/issues/11544). It may be removed in a 
future release after the bug is fixed. This parameter defines the maximum 
number of items to store in each sketch. If a sketch reaches the limit, the 
query can throw `IllegalStateException`. To workaround this issue, increase the 
maximum stream length. See [accuracy 
information](https://datasketches.apache.org/docs/Quantiles/OrigQuantilesS [...]
 
 ### Post Aggregators
 
diff --git a/docs/querying/sql.md b/docs/querying/sql.md
index 4d203f4..f9c765b 100644
--- a/docs/querying/sql.md
+++ b/docs/querying/sql.md
@@ -339,9 +339,9 @@ Only the COUNT, ARRAY_AGG, and STRING_AGG aggregations can 
accept the DISTINCT k
 |`DS_HLL(expr, [lgK, tgtHllType])`|Creates an [HLL 
sketch](../development/extensions-core/datasketches-hll.md) on the values of 
expr, which can be a regular column or a column containing HLL sketches. The 
`lgK` and `tgtHllType` parameters are described in the HLL sketch 
documentation. The [DataSketches 
extension](../development/extensions-core/datasketches-extension.md) must be 
loaded to use this function.|`'0'` (STRING)|
 |`DS_THETA(expr, [size])`|Creates a [Theta 
sketch](../development/extensions-core/datasketches-theta.md) on the values of 
expr, which can be a regular column or a column containing Theta sketches. The 
`size` parameter is described in the Theta sketch documentation. The 
[DataSketches 
extension](../development/extensions-core/datasketches-extension.md) must be 
loaded to use this function.|`'0.0'` (STRING)|
 |`APPROX_QUANTILE(expr, probability, [resolution])`|_Deprecated._ Use 
`APPROX_QUANTILE_DS` instead, which provides a superior 
distribution-independent algorithm with formal error 
guarantees.<br/><br/>Computes approximate quantiles on numeric or 
[approxHistogram](../development/extensions-core/approximate-histograms.md#approximate-histogram-aggregator)
 exprs. The "probability" should be between 0 and 1 (exclusive). The 
"resolution" is the number of centroids to use for the computation. Hi [...]
-|`APPROX_QUANTILE_DS(expr, probability, [k])`|Computes approximate quantiles 
on numeric or [Quantiles 
sketch](../development/extensions-core/datasketches-quantiles.md) exprs. The 
"probability" should be between 0 and 1 (exclusive). The `k` parameter is 
described in the Quantiles sketch documentation. The [DataSketches 
extension](../development/extensions-core/datasketches-extension.md) must be 
loaded to use this function.|`NaN`|
+|`APPROX_QUANTILE_DS(expr, probability, [k])`|Computes approximate quantiles 
on numeric or [Quantiles 
sketch](../development/extensions-core/datasketches-quantiles.md) exprs. 
Allowable "probability" values are between 0 and 1, exclusive. The `k` 
parameter is described in the Quantiles sketch documentation. You must load 
[DataSketches 
extension](../development/extensions-core/datasketches-extension.md) to use 
this function.<br/><br/>See the [known 
issue](#a-known-issue-with-approximate-fu [...]
 |`APPROX_QUANTILE_FIXED_BUCKETS(expr, probability, numBuckets, lowerLimit, 
upperLimit, [outlierHandlingMode])`|Computes approximate quantiles on numeric 
or [fixed buckets 
histogram](../development/extensions-core/approximate-histograms.md#fixed-buckets-histogram)
 exprs. The "probability" should be between 0 and 1 (exclusive). The 
`numBuckets`, `lowerLimit`, `upperLimit`, and `outlierHandlingMode` parameters 
are described in the fixed buckets histogram documentation. The [approximate 
hist [...]
-|`DS_QUANTILES_SKETCH(expr, [k])`|Creates a [Quantiles 
sketch](../development/extensions-core/datasketches-quantiles.md) on the values 
of expr, which can be a regular column or a column containing quantiles 
sketches. The `k` parameter is described in the Quantiles sketch documentation. 
The [DataSketches 
extension](../development/extensions-core/datasketches-extension.md) must be 
loaded to use this function.|`'0'` (STRING)|
+|`DS_QUANTILES_SKETCH(expr, [k])`|Creates a [Quantiles 
sketch](../development/extensions-core/datasketches-quantiles.md) on the values 
of `expr`, which can be a regular column or a column containing quantiles 
sketches. The `k` parameter is described in the Quantiles sketch documentation. 
You must load the [DataSketches 
extension](../development/extensions-core/datasketches-extension.md) to use 
this function.<br/><br/>See the [known 
issue](#a-known-issue-with-approximate-functions-based-o [...]
 |`BLOOM_FILTER(expr, numEntries)`|Computes a bloom filter from values produced 
by `expr`, with `numEntries` maximum number of distinct values before false 
positive rate increases. See [bloom filter 
extension](../development/extensions-core/bloom-filter.md) documentation for 
additional details.|Empty base64 encoded bloom filter STRING|
 |`TDIGEST_QUANTILE(expr, quantileFraction, [compression])`|Builds a T-Digest 
sketch on values produced by `expr` and returns the value for the quantile. 
Compression parameter (default value 100) determines the accuracy and size of 
the sketch. Higher compression means higher accuracy but more space to store 
sketches. See [t-digest 
extension](../development/extensions-contrib/tdigestsketch-quantiles.md) 
documentation for additional details.|`Double.NaN`|
 |`TDIGEST_GENERATE_SKETCH(expr, [compression])`|Builds a T-Digest sketch on 
values produced by `expr`. Compression parameter (default value 100) determines 
the accuracy and size of the sketch Higher compression means higher accuracy 
but more space to store sketches. See [t-digest 
extension](../development/extensions-contrib/tdigestsketch-quantiles.md) 
documentation for additional details.|Empty base64 encoded T-Digest sketch 
STRING|
@@ -820,6 +820,16 @@ either through query context or through Broker 
configuration.
 - Aggregation functions that are labeled as using sketches or approximations, 
such as APPROX_COUNT_DISTINCT, are always
 approximate, regardless of configuration.
 
+#### A known issue with approximate functions based on data sketches
+
+The `APPROX_QUANTILE_DS` and `DS_QUANTILES_SKETCH` functions can fail with an 
`IllegalStateException` if one of the sketches for
+the query hits `maxStreamLength`: the maximum number of items to store in each 
sketch.
+See [GitHub issue 11544](https://github.com/apache/druid/issues/11544) for 
more details.
+To workaround the issue, increase value of the maximum string length with the 
`approxQuantileDsMaxStreamLength` parameter
+in the query context. Since it is set to 1,000,000,000 by default, you don't 
need to override it in most cases.
+See [accuracy 
information](https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch)
 in the DataSketches documentation for how many bytes are required per stream 
length.
+This query context  parameter is a temporary solution to avoid the known 
issue. It may be removed in a future release after the bug is fixed.
+
 ### Unsupported features
 
 Druid does not support all SQL features. In particular, the following features 
are not supported.
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
index 27da4a2..4437279 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java
@@ -21,6 +21,7 @@ package 
org.apache.druid.query.aggregation.datasketches.quantiles;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.datasketches.Util;
 import org.apache.datasketches.quantiles.DoublesSketch;
 import org.apache.datasketches.quantiles.DoublesUnion;
@@ -63,24 +64,42 @@ public class DoublesSketchAggregatorFactory extends 
AggregatorFactory
   public static final int DEFAULT_K = 128;
 
   // Used for sketch size estimation.
-  private static final long MAX_STREAM_LENGTH = 1_000_000_000;
+  public static final long DEFAULT_MAX_STREAM_LENGTH = 1_000_000_000;
 
   private final String name;
   private final String fieldName;
   private final int k;
+  private final long maxStreamLength;
   private final byte cacheTypeId;
 
   @JsonCreator
   public DoublesSketchAggregatorFactory(
       @JsonProperty("name") final String name,
       @JsonProperty("fieldName") final String fieldName,
-      @JsonProperty("k") final Integer k
+      @JsonProperty("k") @Nullable final Integer k,
+      @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength
   )
   {
-    this(name, fieldName, k, 
AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID);
+    this(name, fieldName, k, maxStreamLength, 
AggregatorUtil.QUANTILES_DOUBLES_SKETCH_BUILD_CACHE_TYPE_ID);
   }
 
-  DoublesSketchAggregatorFactory(final String name, final String fieldName, 
final Integer k, final byte cacheTypeId)
+  @VisibleForTesting
+  public DoublesSketchAggregatorFactory(
+      final String name,
+      final String fieldName,
+      @Nullable final Integer k
+  )
+  {
+    this(name, fieldName, k, null);
+  }
+
+  DoublesSketchAggregatorFactory(
+      final String name,
+      final String fieldName,
+      @Nullable final Integer k,
+      @Nullable final Long maxStreamLength,
+      final byte cacheTypeId
+  )
   {
     if (name == null) {
       throw new IAE("Must have a valid, non-null aggregator name");
@@ -92,6 +111,7 @@ public class DoublesSketchAggregatorFactory extends 
AggregatorFactory
     this.fieldName = fieldName;
     this.k = k == null ? DEFAULT_K : k;
     Util.checkIfPowerOf2(this.k, "k");
+    this.maxStreamLength = maxStreamLength == null ? DEFAULT_MAX_STREAM_LENGTH 
: maxStreamLength;
     this.cacheTypeId = cacheTypeId;
   }
 
@@ -266,6 +286,12 @@ public class DoublesSketchAggregatorFactory extends 
AggregatorFactory
     return k;
   }
 
+  @JsonProperty
+  public long getMaxStreamLength()
+  {
+    return maxStreamLength;
+  }
+
   @Override
   public List<String> requiredFields()
   {
@@ -278,7 +304,7 @@ public class DoublesSketchAggregatorFactory extends 
AggregatorFactory
   @Override
   public int getMaxIntermediateSize()
   {
-    return DoublesSketch.getUpdatableStorageBytes(k, MAX_STREAM_LENGTH);
+    return DoublesSketch.getUpdatableStorageBytes(k, maxStreamLength);
   }
 
   @Override
@@ -288,7 +314,8 @@ public class DoublesSketchAggregatorFactory extends 
AggregatorFactory
         new DoublesSketchAggregatorFactory(
             fieldName,
             fieldName,
-            k
+            k,
+            maxStreamLength
         )
     );
   }
@@ -296,7 +323,7 @@ public class DoublesSketchAggregatorFactory extends 
AggregatorFactory
   @Override
   public AggregatorFactory getCombiningFactory()
   {
-    return new DoublesSketchMergeAggregatorFactory(name, k);
+    return new DoublesSketchMergeAggregatorFactory(name, k, maxStreamLength);
   }
 
   @Override
@@ -306,7 +333,11 @@ public class DoublesSketchAggregatorFactory extends 
AggregatorFactory
       // DoublesUnion supports inputs with different k.
       // The result will have effective k between the specified k and the 
minimum k from all input sketches
       // to achieve higher accuracy as much as possible.
-      return new DoublesSketchMergeAggregatorFactory(name, Math.max(k, 
((DoublesSketchAggregatorFactory) other).k));
+      return new DoublesSketchMergeAggregatorFactory(
+          name,
+          Math.max(k, ((DoublesSketchAggregatorFactory) other).k),
+          maxStreamLength
+      );
     } else {
       throw new AggregatorFactoryNotMergeableException(this, other);
     }
@@ -343,35 +374,30 @@ public class DoublesSketchAggregatorFactory extends 
AggregatorFactory
   @Override
   public byte[] getCacheKey()
   {
+    // maxStreamLength is not included in the cache key as it does nothing 
with query result.
     return new 
CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build();
   }
 
   @Override
-  public boolean equals(final Object o)
+  public boolean equals(Object o)
   {
     if (this == o) {
       return true;
     }
-    if (o == null || !getClass().equals(o.getClass())) {
-      return false;
-    }
-    final DoublesSketchAggregatorFactory that = 
(DoublesSketchAggregatorFactory) o;
-    if (!name.equals(that.name)) {
-      return false;
-    }
-    if (!fieldName.equals(that.fieldName)) {
-      return false;
-    }
-    if (k != that.k) {
+    if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    return true;
+    DoublesSketchAggregatorFactory that = (DoublesSketchAggregatorFactory) o;
+    return k == that.k
+           && maxStreamLength == that.maxStreamLength
+           && name.equals(that.name)
+           && fieldName.equals(that.fieldName);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(name, fieldName, k); // no need to use cacheTypeId here
+    return Objects.hash(name, fieldName, k, maxStreamLength); // no need to 
use cacheTypeId here
   }
 
   @Override
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
index 74be19b..c2529ac 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java
@@ -57,7 +57,7 @@ public class DoublesSketchBuildBufferAggregator implements 
BufferAggregator
     }
 
     final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buffer, 
position);
-    sketch.update(selector.getDouble());
+    DoublesSketches.handleMaxStreamLengthLimit(() -> 
sketch.update(selector.getDouble()));
   }
 
   @Nullable
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java
index af29c5b..c1074f5 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildVectorAggregator.java
@@ -55,11 +55,13 @@ public class DoublesSketchBuildVectorAggregator implements 
VectorAggregator
 
     final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buf, 
position);
 
-    for (int i = startRow; i < endRow; i++) {
-      if (nulls == null || !nulls[i]) {
-        sketch.update(doubles[i]);
+    DoublesSketches.handleMaxStreamLengthLimit(() -> {
+      for (int i = startRow; i < endRow; i++) {
+        if (nulls == null || !nulls[i]) {
+          sketch.update(doubles[i]);
+        }
       }
-    }
+    });
   }
 
   @Override
@@ -74,14 +76,16 @@ public class DoublesSketchBuildVectorAggregator implements 
VectorAggregator
     final double[] doubles = selector.getDoubleVector();
     final boolean[] nulls = selector.getNullVector();
 
-    for (int i = 0; i < numRows; i++) {
-      final int idx = rows != null ? rows[i] : i;
+    DoublesSketches.handleMaxStreamLengthLimit(() -> {
+      for (int i = 0; i < numRows; i++) {
+        final int idx = rows != null ? rows[i] : i;
 
-      if (nulls == null || !nulls[idx]) {
-        final int position = positions[i] + positionOffset;
-        helper.getSketchAtPosition(buf, position).update(doubles[idx]);
+        if (nulls == null || !nulls[idx]) {
+          final int position = positions[i] + positionOffset;
+          helper.getSketchAtPosition(buf, position).update(doubles[idx]);
+        }
       }
-    }
+    });
   }
 
   @Override
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
index 6693742..a5f12d2 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregator.java
@@ -76,10 +76,12 @@ public class DoublesSketchMergeAggregator implements 
Aggregator
     if (object == null) {
       return;
     }
-    if (object instanceof DoublesSketch) {
-      union.update((DoublesSketch) object);
-    } else {
-      union.update(selector.getDouble());
-    }
+    DoublesSketches.handleMaxStreamLengthLimit(() -> {
+      if (object instanceof DoublesSketch) {
+        union.update((DoublesSketch) object);
+      } else {
+        union.update(selector.getDouble());
+      }
+    });
   }
 }
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java
index 14e8114..c8cd6b0 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactory.java
@@ -21,6 +21,7 @@ package 
org.apache.druid.query.aggregation.datasketches.quantiles;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.datasketches.quantiles.DoublesSketch;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.aggregation.AggregatorUtil;
@@ -29,15 +30,28 @@ import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.NilColumnValueSelector;
 
+import javax.annotation.Nullable;
+
 public class DoublesSketchMergeAggregatorFactory extends 
DoublesSketchAggregatorFactory
 {
 
   @JsonCreator
   public DoublesSketchMergeAggregatorFactory(
       @JsonProperty("name") final String name,
-      @JsonProperty("k") final Integer k)
+      @JsonProperty("k") @Nullable final Integer k,
+      @JsonProperty("maxStreamLength") @Nullable final Long maxStreamLength
+  )
+  {
+    super(name, name, k, maxStreamLength, 
AggregatorUtil.QUANTILES_DOUBLES_SKETCH_MERGE_CACHE_TYPE_ID);
+  }
+
+  @VisibleForTesting
+  DoublesSketchMergeAggregatorFactory(
+      final String name,
+      @Nullable final Integer k
+  )
   {
-    super(name, name, k, 
AggregatorUtil.QUANTILES_DOUBLES_SKETCH_MERGE_CACHE_TYPE_ID);
+    this(name, k, null);
   }
 
   @Override
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java
index 8a8e10b..92437d0 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeVectorAggregator.java
@@ -55,12 +55,14 @@ public class DoublesSketchMergeVectorAggregator implements 
VectorAggregator
 
     final DoublesUnion union = helper.getSketchAtPosition(buf, position);
 
-    for (int i = startRow; i < endRow; i++) {
-      final DoublesSketch sketch = (DoublesSketch) vector[i];
-      if (sketch != null) {
-        union.update(sketch);
+    DoublesSketches.handleMaxStreamLengthLimit(() -> {
+      for (int i = startRow; i < endRow; i++) {
+        final DoublesSketch sketch = (DoublesSketch) vector[i];
+        if (sketch != null) {
+          union.update(sketch);
+        }
       }
-    }
+    });
   }
 
   @Override
@@ -74,15 +76,17 @@ public class DoublesSketchMergeVectorAggregator implements 
VectorAggregator
   {
     final Object[] vector = selector.getObjectVector();
 
-    for (int i = 0; i < numRows; i++) {
-      final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? 
rows[i] : i];
+    DoublesSketches.handleMaxStreamLengthLimit(() -> {
+      for (int i = 0; i < numRows; i++) {
+        final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? 
rows[i] : i];
 
-      if (sketch != null) {
-        final int position = positions[i] + positionOffset;
-        final DoublesUnion union = helper.getSketchAtPosition(buf, position);
-        union.update(sketch);
+        if (sketch != null) {
+          final int position = positions[i] + positionOffset;
+          final DoublesUnion union = helper.getSketchAtPosition(buf, position);
+          union.update(sketch);
+        }
       }
-    }
+    });
   }
 
   @Nullable
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketches.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketches.java
new file mode 100644
index 0000000..d7fc420
--- /dev/null
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketches.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.quantiles;
+
+import org.apache.druid.java.util.common.ISE;
+
+public final class DoublesSketches
+{
+  /**
+   * Runs the given task that updates a Doubles sketch backed by a direct byte 
buffer. This method intentionally
+   * accepts the update task as a {@link Runnable} instead of accpeting 
parameters of a sketch and other values
+   * needed for the update. This is to avoid any potential performance impact 
especially when the sketch is updated
+   * in a tight loop. The update task can throw NullPointerException because 
of the known issue filed in
+   * https://github.com/apache/druid/issues/11544. This method catches NPE and 
converts it to a more user-friendly
+   * exception. This method should be removed once the known bug above is 
fixed.
+   */
+  public static void handleMaxStreamLengthLimit(Runnable updateSketchTask)
+  {
+    try {
+      updateSketchTask.run();
+    }
+    catch (NullPointerException e) {
+      throw new ISE(
+          e,
+          "NullPointerException was thrown while updating Doubles sketch. "
+          + "This exception could be potentially because of the known bug 
filed in https://github.com/apache/druid/issues/11544. "
+          + "You could try a higher maxStreamLength than current to work 
around this bug if that is the case. "
+          + "See 
https://druid.apache.org/docs/latest/development/extensions-core/datasketches-quantiles.html
 for more details."
+      );
+    }
+  }
+
+  private DoublesSketches()
+  {
+  }
+}
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java
index 69a07dd..8cab843 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchApproxQuantileSqlAggregator.java
@@ -32,6 +32,7 @@ import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.ReturnTypes;
 import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.druid.java.util.common.Numbers;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import 
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
@@ -51,9 +52,12 @@ import 
org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
 
 import javax.annotation.Nullable;
 import java.util.List;
+import java.util.Map;
 
 public class DoublesSketchApproxQuantileSqlAggregator implements SqlAggregator
 {
+  public static final String CTX_APPROX_QUANTILE_DS_MAX_STREAM_LENGTH = 
"approxQuantileDsMaxStreamLength";
+
   private static final SqlAggFunction FUNCTION_INSTANCE = new 
DoublesSketchApproxQuantileSqlAggFunction();
   private static final String NAME = "APPROX_QUANTILE_DS";
 
@@ -169,7 +173,8 @@ public class DoublesSketchApproxQuantileSqlAggregator 
implements SqlAggregator
       aggregatorFactory = new DoublesSketchAggregatorFactory(
           histogramName,
           input.getDirectColumn(),
-          k
+          k,
+          getMaxStreamLengthFromQueryContext(plannerContext.getQueryContext())
       );
     } else {
       VirtualColumn virtualColumn = 
virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
@@ -180,7 +185,8 @@ public class DoublesSketchApproxQuantileSqlAggregator 
implements SqlAggregator
       aggregatorFactory = new DoublesSketchAggregatorFactory(
           histogramName,
           virtualColumn.getOutputName(),
-          k
+          k,
+          getMaxStreamLengthFromQueryContext(plannerContext.getQueryContext())
       );
     }
 
@@ -197,6 +203,13 @@ public class DoublesSketchApproxQuantileSqlAggregator 
implements SqlAggregator
     );
   }
 
+  @Nullable
+  static Long getMaxStreamLengthFromQueryContext(Map<String, Object> 
queryContext)
+  {
+    final Object val = 
queryContext.get(CTX_APPROX_QUANTILE_DS_MAX_STREAM_LENGTH);
+    return val == null ? null : Numbers.parseLong(val);
+  }
+
   private static class DoublesSketchApproxQuantileSqlAggFunction extends 
SqlAggFunction
   {
     private static final String SIGNATURE1 = "'" + NAME + "(column, 
probability)'\n";
diff --git 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java
 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java
index 76d3e9c..d9b84e7 100644
--- 
a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java
+++ 
b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchObjectSqlAggregator.java
@@ -113,7 +113,8 @@ public class DoublesSketchObjectSqlAggregator implements 
SqlAggregator
       aggregatorFactory = new DoublesSketchAggregatorFactory(
           histogramName,
           input.getDirectColumn(),
-          k
+          k,
+          
DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.getQueryContext())
       );
     } else {
       VirtualColumn virtualColumn = 
virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
@@ -124,7 +125,8 @@ public class DoublesSketchObjectSqlAggregator implements 
SqlAggregator
       aggregatorFactory = new DoublesSketchAggregatorFactory(
           histogramName,
           virtualColumn.getOutputName(),
-          k
+          k,
+          
DoublesSketchApproxQuantileSqlAggregator.getMaxStreamLengthFromQueryContext(plannerContext.getQueryContext())
       );
     }
 
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
index 0b5f80e..c18212c 100644
--- 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java
@@ -19,8 +19,13 @@
 
 package org.apache.druid.query.aggregation.datasketches.quantiles;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.query.Druids;
+import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
 import 
org.apache.druid.query.aggregation.post.FinalizingFieldAccessPostAggregator;
@@ -31,9 +36,74 @@ import org.apache.druid.segment.column.ValueType;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+
 public class DoublesSketchAggregatorFactoryTest
 {
   @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(DoublesSketchAggregatorFactory.class)
+                  .withNonnullFields("name", "fieldName")
+                  .withIgnoredFields("cacheTypeId")
+                  .usingGetClass()
+                  .verify();
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    final ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.registerSubtypes(new 
NamedType(DoublesSketchAggregatorFactory.class, 
DoublesSketchModule.DOUBLES_SKETCH));
+    final DoublesSketchAggregatorFactory factory = new 
DoublesSketchAggregatorFactory(
+        "myFactory",
+        "myField",
+        1024,
+        1000L
+    );
+    final byte[] json = mapper.writeValueAsBytes(factory);
+    final DoublesSketchAggregatorFactory fromJson = 
(DoublesSketchAggregatorFactory) mapper.readValue(
+        json,
+        AggregatorFactory.class
+    );
+    Assert.assertEquals(factory, fromJson);
+  }
+
+  @Test
+  public void testDefaultParams()
+  {
+    final DoublesSketchAggregatorFactory factory = new 
DoublesSketchAggregatorFactory(
+        "myFactory",
+        "myField",
+        null,
+        null
+    );
+
+    Assert.assertEquals(DoublesSketchAggregatorFactory.DEFAULT_K, 
factory.getK());
+    
Assert.assertEquals(DoublesSketchAggregatorFactory.DEFAULT_MAX_STREAM_LENGTH, 
factory.getMaxStreamLength());
+  }
+
+  @Test
+  public void testMaxIntermediateSize()
+  {
+    DoublesSketchAggregatorFactory factory = new 
DoublesSketchAggregatorFactory(
+        "myFactory",
+        "myField",
+        128,
+        null
+    );
+    Assert.assertEquals(24608L, factory.getMaxIntermediateSize());
+
+    factory = new DoublesSketchAggregatorFactory(
+        "myFactory",
+        "myField",
+        128,
+        1_000_000_000_000L
+    );
+    Assert.assertEquals(34848L, factory.getMaxIntermediateSize());
+  }
+
+  @Test
   public void testResultArraySignature()
   {
     final TimeseriesQuery query =
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
index 8152369..d20f369 100644
--- 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorTest.java
@@ -31,11 +31,15 @@ import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
 import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
 import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -49,15 +53,19 @@ import java.util.List;
 @RunWith(Parameterized.class)
 public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest
 {
-
+  private final GroupByQueryConfig config;
   private final AggregationTestHelper helper;
   private final AggregationTestHelper timeSeriesHelper;
 
   @Rule
   public final TemporaryFolder tempFolder = new TemporaryFolder();
 
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
   public DoublesSketchAggregatorTest(final GroupByQueryConfig config, final 
String vectorize)
   {
+    this.config = config;
     DoublesSketchModule.registerSerde();
     DoublesSketchModule module = new DoublesSketchModule();
     helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
@@ -534,4 +542,130 @@ public class DoublesSketchAggregatorTest extends 
InitializedNullHandlingTest
     List<ResultRow> results = seq.toList();
     Assert.assertEquals(1, results.size());
   }
+
+  @Test
+  public void testFailureWhenMaxStreamLengthHit() throws Exception
+  {
+    if 
(GroupByStrategySelector.STRATEGY_V1.equals(config.getDefaultStrategy())) {
+      expectedException.expect(new 
RecursiveExceptionMatcher(IllegalStateException.class));
+      expectedException.expectMessage("NullPointerException was thrown while 
updating Doubles sketch");
+
+      helper.createIndexAndRunQueryOnSegment(
+          new 
File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()),
+          String.join(
+              "\n",
+              "{",
+              "  \"type\": \"string\",",
+              "  \"parseSpec\": {",
+              "    \"format\": \"tsv\",",
+              "    \"timestampSpec\": {\"column\": \"timestamp\", \"format\": 
\"yyyyMMddHH\"},",
+              "    \"dimensionsSpec\": {",
+              "      \"dimensions\": [\"sequenceNumber\", \"product\"],",
+              "      \"dimensionExclusions\": [],",
+              "      \"spatialDimensions\": []",
+              "    },",
+              "    \"columns\": [\"timestamp\", \"sequenceNumber\", 
\"product\", \"value\"]",
+              "  }",
+              "}"
+          ),
+          "[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": 
\"value\"}]",
+          0, // minTimestamp
+          Granularities.NONE,
+          10, // maxRowCount
+          String.join(
+              "\n",
+              "{",
+              "  \"queryType\": \"groupBy\",",
+              "  \"dataSource\": \"test_datasource\",",
+              "  \"granularity\": \"ALL\",",
+              "  \"dimensions\": [],",
+              "  \"aggregations\": [",
+              "    {\"type\": \"quantilesDoublesSketch\", \"name\": 
\"sketch\", \"fieldName\": \"value\", \"k\": 128, \"maxStreamLength\": 10}",
+              "  ],",
+              "  \"postAggregations\": [",
+              "    {\"type\": \"quantilesDoublesSketchToQuantile\", \"name\": 
\"quantile\", \"fraction\": 0.5, \"field\": {\"type\": \"fieldAccess\", 
\"fieldName\": \"sketch\"}},",
+              "    {\"type\": \"quantilesDoublesSketchToQuantiles\", \"name\": 
\"quantiles\", \"fractions\": [0, 0.5, 1], \"field\": {\"type\": 
\"fieldAccess\", \"fieldName\": \"sketch\"}},",
+              "    {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": 
\"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": 
\"fieldAccess\", \"fieldName\": \"sketch\"}}",
+              "  ],",
+              "  \"intervals\": 
[\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+              "}"
+          )
+      );
+    } else {
+      Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
+          new 
File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()),
+          String.join(
+              "\n",
+              "{",
+              "  \"type\": \"string\",",
+              "  \"parseSpec\": {",
+              "    \"format\": \"tsv\",",
+              "    \"timestampSpec\": {\"column\": \"timestamp\", \"format\": 
\"yyyyMMddHH\"},",
+              "    \"dimensionsSpec\": {",
+              "      \"dimensions\": [\"sequenceNumber\", \"product\"],",
+              "      \"dimensionExclusions\": [],",
+              "      \"spatialDimensions\": []",
+              "    },",
+              "    \"columns\": [\"timestamp\", \"sequenceNumber\", 
\"product\", \"value\"]",
+              "  }",
+              "}"
+          ),
+          "[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": 
\"value\"}]",
+          0, // minTimestamp
+          Granularities.NONE,
+          10, // maxRowCount
+          String.join(
+              "\n",
+              "{",
+              "  \"queryType\": \"groupBy\",",
+              "  \"dataSource\": \"test_datasource\",",
+              "  \"granularity\": \"ALL\",",
+              "  \"dimensions\": [],",
+              "  \"aggregations\": [",
+              "    {\"type\": \"quantilesDoublesSketch\", \"name\": 
\"sketch\", \"fieldName\": \"value\", \"k\": 128, \"maxStreamLength\": 10}",
+              "  ],",
+              "  \"postAggregations\": [",
+              "    {\"type\": \"quantilesDoublesSketchToQuantile\", \"name\": 
\"quantile\", \"fraction\": 0.5, \"field\": {\"type\": \"fieldAccess\", 
\"fieldName\": \"sketch\"}},",
+              "    {\"type\": \"quantilesDoublesSketchToQuantiles\", \"name\": 
\"quantiles\", \"fractions\": [0, 0.5, 1], \"field\": {\"type\": 
\"fieldAccess\", \"fieldName\": \"sketch\"}},",
+              "    {\"type\": \"quantilesDoublesSketchToHistogram\", \"name\": 
\"histogram\", \"splitPoints\": [0.25, 0.5, 0.75], \"field\": {\"type\": 
\"fieldAccess\", \"fieldName\": \"sketch\"}}",
+              "  ],",
+              "  \"intervals\": 
[\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+              "}"
+          )
+      );
+
+      expectedException.expect(new 
RecursiveExceptionMatcher(IllegalStateException.class));
+      expectedException.expectMessage("NullPointerException was thrown while 
updating Doubles sketch");
+      seq.toList();
+    }
+  }
+
+  private static class RecursiveExceptionMatcher extends BaseMatcher<Object>
+  {
+    private final Class<? extends Throwable> expected;
+
+    private RecursiveExceptionMatcher(Class<? extends Throwable> expected)
+    {
+      this.expected = expected;
+    }
+
+    @Override
+    public boolean matches(Object item)
+    {
+      if (expected.isInstance(item)) {
+        return true;
+      } else if (item instanceof Throwable) {
+        if (((Throwable) item).getCause() != null) {
+          return matches(((Throwable) item).getCause());
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public void describeTo(Description description)
+    {
+      description.appendText("a recursive instance of 
").appendText(expected.getName());
+    }
+  }
 }
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactoryTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactoryTest.java
new file mode 100644
index 0000000..55d0b12
--- /dev/null
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchMergeAggregatorFactoryTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.datasketches.quantiles;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class DoublesSketchMergeAggregatorFactoryTest
+{
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(DoublesSketchMergeAggregatorFactory.class)
+                  .withNonnullFields("name", "fieldName")
+                  .withIgnoredFields("cacheTypeId")
+                  .usingGetClass()
+                  .verify();
+  }
+
+  @Test
+  public void testSerde() throws IOException
+  {
+    final ObjectMapper mapper = new DefaultObjectMapper();
+    mapper.registerSubtypes(
+        new NamedType(DoublesSketchMergeAggregatorFactory.class, 
DoublesSketchModule.DOUBLES_SKETCH_MERGE)
+    );
+    final DoublesSketchMergeAggregatorFactory factory = new 
DoublesSketchMergeAggregatorFactory(
+        "myFactory",
+        1024,
+        1000L
+    );
+    final byte[] json = mapper.writeValueAsBytes(factory);
+    final DoublesSketchMergeAggregatorFactory fromJson = 
(DoublesSketchMergeAggregatorFactory) mapper.readValue(
+        json,
+        AggregatorFactory.class
+    );
+    Assert.assertEquals(factory, fromJson);
+  }
+}
diff --git 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
index 4ebd7c9..bf55031 100644
--- 
a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
+++ 
b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/sql/DoublesSketchSqlAggregatorTest.java
@@ -67,7 +67,9 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class DoublesSketchSqlAggregatorTest extends BaseCalciteQueryTest
 {
@@ -821,6 +823,43 @@ public class DoublesSketchSqlAggregatorTest extends 
BaseCalciteQueryTest
     );
   }
 
+  @Test
+  public void testFailWithSmallMaxStreamLength() throws Exception
+  {
+    final Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
+    context.put(
+        
DoublesSketchApproxQuantileSqlAggregator.CTX_APPROX_QUANTILE_DS_MAX_STREAM_LENGTH,
+        1
+    );
+    testQueryThrows(
+        "SELECT\n"
+        + "APPROX_QUANTILE_DS(m1, 0.01),\n"
+        + "APPROX_QUANTILE_DS(cnt, 0.5)\n"
+        + "FROM foo",
+        context,
+        Collections.singletonList(
+            Druids.newTimeseriesQueryBuilder()
+                  .dataSource(CalciteTests.DATASOURCE1)
+                  .intervals(new 
MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+                  .granularity(Granularities.ALL)
+                  .aggregators(ImmutableList.of(
+                      new DoublesSketchAggregatorFactory("a0:agg", "m1", null, 
1L),
+                      new DoublesSketchAggregatorFactory("a1:agg", "cnt", 
null, 1L)
+                  ))
+                  .postAggregators(
+                      new DoublesSketchToQuantilePostAggregator("a0", 
makeFieldAccessPostAgg("a0:agg"), 0.01f),
+                      new DoublesSketchToQuantilePostAggregator("a1", 
makeFieldAccessPostAgg("a1:agg"), 0.50f)
+                  )
+                  .context(context)
+                  .build()
+        ),
+        expectedException -> {
+          expectedException.expect(IllegalStateException.class);
+          expectedException.expectMessage("NullPointerException was thrown 
while updating Doubles sketch");
+        }
+    );
+  }
+
   private static PostAggregator makeFieldAccessPostAgg(String name)
   {
     return new FieldAccessPostAggregator(name, name);
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 54d2971..83cc761 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -112,7 +112,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           fullDatasourceName,
           AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
           0,
-          14312,
+          14370,
           0,
           0,
           2,
@@ -130,7 +130,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           fullDatasourceName,
           AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
           0,
-          22481,
+          22568,
           0,
           0,
           3,
@@ -246,8 +246,8 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
       getAndAssertCompactionStatus(
           fullDatasourceName,
           AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
-          14312,
-          14311,
+          14370,
+          14369,
           0,
           2,
           2,
@@ -255,7 +255,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           1,
           1,
           0);
-      
Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"),
 "14312");
+      
Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"),
 "14370");
       // Run compaction again to compact the remaining day
       // Remaining day compacted (1 new segment). Now both days compacted (2 
total)
       forceTriggerAutoCompaction(2);
@@ -266,7 +266,7 @@ public class ITAutoCompactionTest extends 
AbstractIndexerTest
           fullDatasourceName,
           AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING,
           0,
-          22481,
+          22568,
           0,
           0,
           3,
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index b4c3682..876ea5c 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -99,6 +99,7 @@ import org.junit.Rule;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -106,6 +107,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
@@ -760,6 +762,14 @@ public class BaseCalciteQueryTest extends CalciteTestBase
     Assert.assertEquals(StringUtils.format("result count: %s", sql), 
expectedResults.size(), results.size());
     assertResultsEquals(sql, expectedResults, results);
 
+    verifyQueries(sql, expectedQueries);
+  }
+
+  private void verifyQueries(
+      final String sql,
+      @Nullable final List<Query> expectedQueries
+  )
+  {
     if (expectedQueries != null) {
       final List<Query> recordedQueries = queryLogHook.getRecordedQueries();
 
@@ -789,6 +799,74 @@ public class BaseCalciteQueryTest extends CalciteTestBase
     }
   }
 
+  public void testQueryThrows(
+      final String sql,
+      final Map<String, Object> queryContext,
+      final List<Query> expectedQueries,
+      final Consumer<ExpectedException> expectedExceptionInitializer
+  ) throws Exception
+  {
+    testQueryThrows(
+        PLANNER_CONFIG_DEFAULT,
+        queryContext,
+        DEFAULT_PARAMETERS,
+        sql,
+        CalciteTests.REGULAR_USER_AUTH_RESULT,
+        expectedQueries,
+        expectedExceptionInitializer
+    );
+  }
+
+  public void testQueryThrows(
+      final PlannerConfig plannerConfig,
+      final Map<String, Object> queryContext,
+      final List<SqlParameter> parameters,
+      final String sql,
+      final AuthenticationResult authenticationResult,
+      final List<Query> expectedQueries,
+      final Consumer<ExpectedException> expectedExceptionInitializer
+  ) throws Exception
+  {
+    log.info("SQL: %s", sql);
+
+    final List<String> vectorizeValues = new ArrayList<>();
+
+    vectorizeValues.add("false");
+
+    if (!skipVectorize) {
+      vectorizeValues.add("force");
+    }
+
+    for (final String vectorize : vectorizeValues) {
+      queryLogHook.clearRecordedQueries();
+
+      final Map<String, Object> theQueryContext = new HashMap<>(queryContext);
+      theQueryContext.put(QueryContexts.VECTORIZE_KEY, vectorize);
+      theQueryContext.put(QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, 
vectorize);
+
+      if (!"false".equals(vectorize)) {
+        theQueryContext.put(QueryContexts.VECTOR_SIZE_KEY, 2); // Small vector 
size to ensure we use more than one.
+      }
+
+      final List<Query> theQueries = new ArrayList<>();
+      for (Query query : expectedQueries) {
+        theQueries.add(recursivelyOverrideContext(query, theQueryContext));
+      }
+
+      if (cannotVectorize && "force".equals(vectorize)) {
+        expectedException.expect(RuntimeException.class);
+        expectedException.expectMessage("Cannot vectorize");
+      } else {
+        expectedExceptionInitializer.accept(expectedException);
+      }
+
+      // this should validate expectedException
+      getResults(plannerConfig, theQueryContext, parameters, sql, 
authenticationResult);
+
+      verifyQueries(sql, theQueries);
+    }
+  }
+
   public Set<Resource> analyzeResources(
       PlannerConfig plannerConfig,
       String sql,
diff --git a/website/.spelling b/website/.spelling
index 8d39a18..017402c 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -682,6 +682,7 @@ tgtHllType
  - ../docs/development/extensions-core/datasketches-quantiles.md
 CDF
 DoublesSketch
+maxStreamLength
 PMF
 quantilesDoublesSketch
 toString

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to