This is an automated email from the ASF dual-hosted git repository.
tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new dcd3d87b189 Add Spectator Histogram SQL functions (#18885)
dcd3d87b189 is described below
commit dcd3d87b189252127689a2acacef0911ac9e6886
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Jan 6 16:48:00 2026 -0800
Add Spectator Histogram SQL functions (#18885)
- `SPECTATOR_COUNT` returns the # of distinct events inside the histogram
(either pre/post aggregated).
- `SPECTATOR_PERCENTILE` allows computing single/multiple percentiles over
a histogram column (either pre/post aggregated).
---
.../extensions-contrib/spectator-histogram.md | 141 +++++-
docs/querying/sql-aggregations.md | 12 +
extensions-contrib/spectator-histogram/pom.xml | 30 ++
.../SpectatorHistogramAggregatorFactory.java | 2 +-
... => SpectatorHistogramCountPostAggregator.java} | 53 +-
.../histogram/SpectatorHistogramModule.java | 10 +
...SpectatorHistogramPercentilePostAggregator.java | 2 +-
.../sql/SpectatorHistogramCountSqlAggregator.java | 143 ++++++
.../SpectatorHistogramPercentileSqlAggregator.java | 276 +++++++++++
.../histogram/sql/SpectatorHistogramSqlUtils.java | 120 +++++
.../SpectatorHistogramAggregatorTest.java | 91 ++++
.../sql/SpectatorHistogramSqlAggregatorTest.java | 550 +++++++++++++++++++++
.../query/aggregation/post/PostAggregatorIds.java | 1 +
website/.spelling | 3 +
14 files changed, 1393 insertions(+), 41 deletions(-)
diff --git a/docs/development/extensions-contrib/spectator-histogram.md
b/docs/development/extensions-contrib/spectator-histogram.md
index e6d12517e5c..64d21ff9ccd 100644
--- a/docs/development/extensions-contrib/spectator-histogram.md
+++ b/docs/development/extensions-contrib/spectator-histogram.md
@@ -79,8 +79,6 @@ Also see the [limitations](#limitations] of this extension.
* Supports positive long integer values within the range of [0, 2^53).
Negatives are
coerced to 0.
* Does not support decimals.
-* Does not support Druid SQL queries, only native queries.
-* Does not support vectorized queries.
* Generates 276 fixed buckets with increasing bucket widths. In practice, the
observed error of computed percentiles ranges from 0.1% to 3%, exclusive. See
[Bucket boundaries](#histogram-bucket-boundaries) for the full list of bucket
boundaries.
:::tip
@@ -134,7 +132,11 @@ To use SpectatorHistogram, make sure you
[include](../../configuration/extension
druid.extensions.loadList=["druid-spectator-histogram"]
```
-## Aggregators
+## Native Query Components
+
+The following sections describe the aggregators and post-aggregators for use
with [native Druid queries](../../querying/querying.md).
+
+### Aggregators
The result of the aggregation is a histogram that is built by ingesting
numeric values from
the raw data, or from combining pre-aggregated histograms. The result is
represented in
@@ -207,9 +209,9 @@ To get the population size (count of events contributing to
the histogram):
| name | A String for the output (result) name of the aggregation.
| yes |
| fieldName | A String for the name of the input field containing
pre-aggregated histograms. | yes |
-## Post Aggregators
+### Post Aggregators
-### Percentile (singular)
+#### Percentile (singular)
This returns a single percentile calculation based on the distribution of the
values in the aggregated histogram.
```
@@ -231,7 +233,7 @@ This returns a single percentile calculation based on the
distribution of the va
| field | A field reference pointing to the aggregated histogram. |
yes |
| percentile | A single decimal percentile between 0.0 and 100.0 |
yes |
-### Percentiles (multiple)
+#### Percentiles (multiple)
This returns an array of percentiles corresponding to those requested.
```
@@ -272,6 +274,133 @@ array of percentiles.
| field | A field reference pointing to the aggregated histogram. |
yes |
| percentiles | Non-empty array of decimal percentiles between 0.0 and 100.0 |
yes |
+#### Count Post-Aggregator
+
+This returns the total count of observations (data points) that were recorded
in the histogram. This is useful for understanding the population size without
needing a separate count metric.
+
+```json
+{
+ "type": "countSpectatorHistogram",
+ "name": "<output name>",
+ "field": {
+ "type": "fieldAccess",
+ "fieldName": "<name of aggregated SpectatorHistogram>"
+ }
+}
+```
+
+| Property | Description |
Required? |
+|----------|------------------------------------------------------------|-----------|
+| type | This String should always be "countSpectatorHistogram" | yes
|
+| name | A String for the output (result) name of the calculation. | yes
|
+| field | A field reference pointing to the aggregated histogram. | yes
|
+
+## SQL Functions
+
+In addition to the native query aggregators and post-aggregators, this
extension provides SQL functions for easier use in Druid SQL queries.
+
+### SPECTATOR_COUNT
+
+Returns the total count of observations (data points) in a Spectator histogram.
+
+**Syntax:**
+```sql
+SPECTATOR_COUNT(expr)
+```
+
+**Arguments:**
+- `expr`: A numeric column to aggregate into a histogram, or a pre-aggregated
Spectator histogram column.
+
+**Returns:** BIGINT - the total number of observations or `NULL` if the
histogram is null or empty.
+
+**Example:**
+```sql
+SELECT
+ SPECTATOR_COUNT(hist_added) AS total_count,
+ SPECTATOR_COUNT(added) AS total_count_from_raw
+FROM wikipedia
+```
+
+### SPECTATOR_PERCENTILE
+
+Computes approximate percentile values from a Spectator histogram. This
function supports two forms: a single percentile or multiple percentiles.
+
+#### Single Percentile
+
+**Syntax:**
+```sql
+SPECTATOR_PERCENTILE(expr, percentile)
+```
+
+**Arguments:**
+- `expr`: A numeric column to aggregate into a histogram, or a pre-aggregated
Spectator histogram column.
+- `percentile`: A decimal value between 0 and 100 representing the desired
percentile.
+
+**Returns:** DOUBLE - the approximate value at the specified percentile or
`NULL` if the histogram is null or empty.
+
+**Example:**
+```sql
+SELECT
+ SPECTATOR_PERCENTILE(hist_added, 50) AS median_added,
+ SPECTATOR_PERCENTILE(hist_added, 99) AS p99_added,
+ SPECTATOR_PERCENTILE(added, 95) AS p95_from_raw
+FROM wikipedia
+```
+
+#### Multiple Percentiles (Array)
+
+**Syntax:**
+```sql
+SPECTATOR_PERCENTILE(expr, ARRAY[p1, p2, ...])
+```
+
+**Arguments:**
+- `expr`: A numeric column to aggregate into a histogram, or a pre-aggregated
Spectator histogram column.
+- `ARRAY[p1, p2, ...]`: An array of decimal values between 0 and 100
representing the desired percentiles.
+
+**Returns:** DOUBLE ARRAY - an array of approximate values at the specified
percentiles, in the same order as requested or `NULL` if the histogram is null
or empty.
+
+**Example:**
+```sql
+SELECT
+ SPECTATOR_PERCENTILE(hist_added, ARRAY[25, 50, 75, 99]) AS percentiles
+FROM wikipedia
+```
+
+This returns an array like `[200.5, 341.0, 468.5, 675.9]` representing the
25th, 50th, 75th, and 99th percentiles.
+
+Using the array form is more efficient than calling `SPECTATOR_PERCENTILE`
multiple times for different percentiles, as the underlying histogram is only
aggregated once.
+
+### Combined Example
+
+You can use both functions together in a single query. Multiple aggregations
on the same column share the underlying histogram aggregator for efficiency:
+
+```sql
+SELECT
+ countryName,
+ SPECTATOR_COUNT(hist_added) AS observation_count,
+ SPECTATOR_PERCENTILE(hist_added, 50) AS median_added,
+ SPECTATOR_PERCENTILE(hist_added, 90) AS p90_added,
+ SPECTATOR_PERCENTILE(hist_added, 99) AS p99_added
+FROM wikipedia
+GROUP BY countryName
+ORDER BY observation_count DESC
+LIMIT 10
+```
+
+Or using the array form to get multiple percentiles in a single column:
+
+```sql
+SELECT
+ countryName,
+ SPECTATOR_COUNT(hist_added) AS observation_count,
+ SPECTATOR_PERCENTILE(hist_added, ARRAY[50, 90, 99]) AS percentiles
+FROM wikipedia
+GROUP BY countryName
+ORDER BY observation_count DESC
+LIMIT 10
+```
+
## Examples
### Example Ingestion Spec
diff --git a/docs/querying/sql-aggregations.md
b/docs/querying/sql-aggregations.md
index 90d4ae05375..c251690ce71 100644
--- a/docs/querying/sql-aggregations.md
+++ b/docs/querying/sql-aggregations.md
@@ -157,3 +157,15 @@ Load the T-Digest extension to use the following
functions. See the [T-Digest ex
|--------|-----|-------|
|`TDIGEST_QUANTILE(expr, quantileFraction, [compression])`|Builds a T-Digest
sketch on values produced by `expr` and returns the value for the quantile.
Compression parameter (default value 100) determines the accuracy and size of
the sketch. Higher compression means higher accuracy but more space to store
sketches.|`Double.NaN`|
|`TDIGEST_GENERATE_SKETCH(expr, [compression])`|Builds a T-Digest sketch on
values produced by `expr`. Compression parameter (default value 100) determines
the accuracy and size of the sketch Higher compression means higher accuracy
but more space to store sketches.|Empty base64 encoded T-Digest sketch STRING|
+
+## Histogram functions
+
+### Spectator Histogram
+
+Load the [Spectator Histogram
extension](../development/extensions-contrib/spectator-histogram.md) to use the
following functions.
+
+|Function|Notes|Default|
+|--------|-----|-------|
+|`SPECTATOR_COUNT(expr)`|Counts the total number of observations (data points)
in a Spectator histogram. The `expr` can be either a numeric column (which will
be aggregated into a histogram) or a pre-aggregated [Spectator
histogram](../development/extensions-contrib/spectator-histogram.md)
column.|`0`|
+|`SPECTATOR_PERCENTILE(expr, percentile)`|Computes an approximate percentile
value from a Spectator histogram. The `expr` can be either a numeric column
(which will be aggregated into a histogram) or a pre-aggregated [Spectator
histogram](../development/extensions-contrib/spectator-histogram.md) column.
The `percentile` should be between 0 and 100.|`NaN`|
+|`SPECTATOR_PERCENTILE(expr, ARRAY[p1, p2, ...])`|Computes multiple
approximate percentile values from a Spectator histogram and returns them as a
DOUBLE ARRAY. The `expr` can be either a numeric column (which will be
aggregated into a histogram) or a pre-aggregated [Spectator
histogram](../development/extensions-contrib/spectator-histogram.md) column.
Each percentile value in the array should be between 0 and 100. This is more
efficient than calling `SPECTATOR_PERCENTILE` multiple times [...]
diff --git a/extensions-contrib/spectator-histogram/pom.xml
b/extensions-contrib/spectator-histogram/pom.xml
index 90c4cf7d8ba..bd181fc2186 100644
--- a/extensions-contrib/spectator-histogram/pom.xml
+++ b/extensions-contrib/spectator-histogram/pom.xml
@@ -120,6 +120,36 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-migrationsupport</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
diff --git
a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorFactory.java
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorFactory.java
index fe5d6f81f0e..a7dffd74f06 100644
---
a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorFactory.java
+++
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorFactory.java
@@ -284,7 +284,7 @@ public class SpectatorHistogramAggregatorFactory extends
AggregatorFactory
@Override
public int hashCode()
{
- return Objects.hash(name, fieldName);
+ return name.hashCode() * 31 + fieldName.hashCode();
}
@Override
diff --git
a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilePostAggregator.java
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramCountPostAggregator.java
similarity index 68%
copy from
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilePostAggregator.java
copy to
extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramCountPostAggregator.java
index 29da04c67cc..196cf9bc740 100644
---
a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilePostAggregator.java
+++
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramCountPostAggregator.java
@@ -22,7 +22,7 @@ package org.apache.druid.spectator.histogram;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
-import com.google.common.primitives.Doubles;
+import com.google.common.primitives.Longs;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.aggregation.post.PostAggregatorIds;
@@ -35,30 +35,25 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
-public class SpectatorHistogramPercentilePostAggregator implements
PostAggregator
+/**
+ * Post-aggregator that returns the total count of observations in a
SpectatorHistogram.
+ * This is the sum of all bucket counts.
+ */
+public class SpectatorHistogramCountPostAggregator implements PostAggregator
{
-
private final String name;
private final PostAggregator field;
- private final double percentile;
-
- public static final String TYPE_NAME = "percentileSpectatorHistogram";
+ public static final String TYPE_NAME = "countSpectatorHistogram";
@JsonCreator
- public SpectatorHistogramPercentilePostAggregator(
+ public SpectatorHistogramCountPostAggregator(
@JsonProperty("name") final String name,
- @JsonProperty("field") final PostAggregator field,
- @JsonProperty("percentile") final double percentile
+ @JsonProperty("field") final PostAggregator field
)
{
this.name = Preconditions.checkNotNull(name, "name is null");
this.field = Preconditions.checkNotNull(field, "field is null");
- Preconditions.checkArgument(
- percentile >= 0 && percentile <= 100,
- "Percentile argument not in range (0, 100)"
- );
- this.percentile = percentile;
}
@Override
@@ -71,7 +66,7 @@ public class SpectatorHistogramPercentilePostAggregator
implements PostAggregato
@Override
public ColumnType getType(ColumnInspector signature)
{
- return ColumnType.DOUBLE;
+ return ColumnType.LONG;
}
@JsonProperty
@@ -80,12 +75,6 @@ public class SpectatorHistogramPercentilePostAggregator
implements PostAggregato
return field;
}
- @JsonProperty
- public double getPercentile()
- {
- return percentile;
- }
-
@Override
public Object compute(final Map<String, Object> combinedAggregators)
{
@@ -93,13 +82,13 @@ public class SpectatorHistogramPercentilePostAggregator
implements PostAggregato
if (sketch == null) {
return null;
}
- return sketch.getPercentileValue(percentile);
+ return sketch.getSum();
}
@Override
- public Comparator<Double> getComparator()
+ public Comparator<Long> getComparator()
{
- return Doubles::compare;
+ return Longs::compare;
}
@Override
@@ -114,17 +103,16 @@ public class SpectatorHistogramPercentilePostAggregator
implements PostAggregato
return getClass().getSimpleName() + "{" +
"name='" + name + '\'' +
", field=" + field +
- ", fraction=" + percentile +
"}";
}
@Override
public byte[] getCacheKey()
{
- final CacheKeyBuilder builder = new CacheKeyBuilder(
-
PostAggregatorIds.SPECTATOR_HISTOGRAM_SKETCH_PERCENTILE_CACHE_TYPE_ID).appendCacheable(field);
- builder.appendDouble(percentile);
- return builder.build();
+ return new CacheKeyBuilder(
+ PostAggregatorIds.SPECTATOR_HISTOGRAM_SKETCH_COUNT_CACHE_TYPE_ID)
+ .appendCacheable(field)
+ .build();
}
@Override
@@ -136,16 +124,15 @@ public class SpectatorHistogramPercentilePostAggregator
implements PostAggregato
if (o == null || getClass() != o.getClass()) {
return false;
}
- SpectatorHistogramPercentilePostAggregator that =
(SpectatorHistogramPercentilePostAggregator) o;
- return Double.compare(that.percentile, percentile) == 0 &&
- Objects.equals(name, that.name) &&
+ SpectatorHistogramCountPostAggregator that =
(SpectatorHistogramCountPostAggregator) o;
+ return Objects.equals(name, that.name) &&
Objects.equals(field, that.field);
}
@Override
public int hashCode()
{
- return Objects.hash(name, field, percentile);
+ return name.hashCode() * 31 + field.hashCode();
}
@Override
diff --git
a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramModule.java
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramModule.java
index b12c600d6b4..24a591da5de 100644
---
a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramModule.java
+++
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramModule.java
@@ -27,6 +27,9 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.segment.serde.ComplexMetrics;
+import
org.apache.druid.spectator.histogram.sql.SpectatorHistogramCountSqlAggregator;
+import
org.apache.druid.spectator.histogram.sql.SpectatorHistogramPercentileSqlAggregator;
+import org.apache.druid.sql.guice.SqlBindings;
import java.util.List;
@@ -78,6 +81,10 @@ public class SpectatorHistogramModule implements DruidModule
new NamedType(
SpectatorHistogramPercentilesPostAggregator.class,
SpectatorHistogramPercentilesPostAggregator.TYPE_NAME
+ ),
+ new NamedType(
+ SpectatorHistogramCountPostAggregator.class,
+ SpectatorHistogramCountPostAggregator.TYPE_NAME
)
).addSerializer(SpectatorHistogram.class, new
SpectatorHistogramJsonSerializer())
);
@@ -87,5 +94,8 @@ public class SpectatorHistogramModule implements DruidModule
public void configure(Binder binder)
{
registerSerde();
+
+ SqlBindings.addAggregator(binder,
SpectatorHistogramPercentileSqlAggregator.class);
+ SqlBindings.addAggregator(binder,
SpectatorHistogramCountSqlAggregator.class);
}
}
diff --git
a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilePostAggregator.java
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilePostAggregator.java
index 29da04c67cc..97dc71c0a18 100644
---
a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilePostAggregator.java
+++
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/SpectatorHistogramPercentilePostAggregator.java
@@ -145,7 +145,7 @@ public class SpectatorHistogramPercentilePostAggregator
implements PostAggregato
@Override
public int hashCode()
{
- return Objects.hash(name, field, percentile);
+ return (name.hashCode() * 31 + field.hashCode()) * 31 +
Double.hashCode(percentile);
}
@Override
diff --git
a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramCountSqlAggregator.java
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramCountSqlAggregator.java
new file mode 100644
index 00000000000..3886639370b
--- /dev/null
+++
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramCountSqlAggregator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram.sql;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Optionality;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramAggregatorFactory;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramCountPostAggregator;
+import org.apache.druid.sql.calcite.aggregation.Aggregation;
+import org.apache.druid.sql.calcite.aggregation.Aggregations;
+import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * SQL aggregator for counting total observations in SpectatorHistograms.
+ *
+ * SPECTATOR_COUNT(column) -> BIGINT
+ *
+ * Returns the sum of all bucket counts (total number of observations recorded
in the histogram).
+ */
+public class SpectatorHistogramCountSqlAggregator implements SqlAggregator
+{
+ private static final SqlAggFunction FUNCTION_INSTANCE = new
SpectatorHistogramCountSqlAggFunction();
+ private static final String NAME = "SPECTATOR_COUNT";
+
+ @Override
+ public SqlAggFunction calciteFunction()
+ {
+ return FUNCTION_INSTANCE;
+ }
+
+ @Nullable
+ @Override
+ public Aggregation toDruidAggregation(
+ final PlannerContext plannerContext,
+ final VirtualColumnRegistry virtualColumnRegistry,
+ final String name,
+ final AggregateCall aggregateCall,
+ final InputAccessor inputAccessor,
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
+ )
+ {
+ final DruidExpression input =
Aggregations.toDruidExpressionForNumericAggregator(
+ plannerContext,
+ inputAccessor.getInputRowSignature(),
+ inputAccessor.getField(aggregateCall.getArgList().get(0))
+ );
+ if (input == null) {
+ return null;
+ }
+
+ final String histogramName = StringUtils.format("%s:agg", name);
+
+ // Look for existing matching aggregatorFactory
+ final SpectatorHistogramAggregatorFactory existingFactory =
+ SpectatorHistogramSqlUtils.findMatchingAggregatorFactory(
+ virtualColumnRegistry,
+ input,
+ existingAggregations
+ );
+
+ if (existingFactory != null) {
+ return Aggregation.create(
+ ImmutableList.of(),
+ new SpectatorHistogramCountPostAggregator(
+ name,
+ new FieldAccessPostAggregator(
+ existingFactory.getName(),
+ existingFactory.getName()
+ )
+ )
+ );
+ }
+
+ // No existing match found. Create a new one.
+ final SpectatorHistogramAggregatorFactory aggregatorFactory =
+ SpectatorHistogramSqlUtils.createAggregatorFactory(
+ virtualColumnRegistry,
+ input,
+ histogramName
+ );
+
+ return Aggregation.create(
+ ImmutableList.of(aggregatorFactory),
+ new SpectatorHistogramCountPostAggregator(
+ name,
+ new FieldAccessPostAggregator(histogramName, histogramName)
+ )
+ );
+ }
+
+ private static class SpectatorHistogramCountSqlAggFunction extends
SqlAggFunction
+ {
+ SpectatorHistogramCountSqlAggFunction()
+ {
+ super(
+ NAME,
+ null,
+ SqlKind.OTHER_FUNCTION,
+ ReturnTypes.explicit(SqlTypeName.BIGINT),
+ null,
+ OperandTypes.ANY,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ false,
+ false,
+ Optionality.FORBIDDEN
+ );
+ }
+ }
+}
diff --git
a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramPercentileSqlAggregator.java
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramPercentileSqlAggregator.java
new file mode 100644
index 00000000000..987700ebd1a
--- /dev/null
+++
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramPercentileSqlAggregator.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram.sql;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Optionality;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramAggregatorFactory;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramPercentilePostAggregator;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramPercentilesPostAggregator;
+import org.apache.druid.sql.calcite.aggregation.Aggregation;
+import org.apache.druid.sql.calcite.aggregation.Aggregations;
+import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.planner.PlannerContext;
+import org.apache.druid.sql.calcite.rel.InputAccessor;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * SQL aggregator for computing percentiles from SpectatorHistograms.
+ * <p>
+ * Supports two forms:
+ * - SPECTATOR_PERCENTILE(column, percentile) -> DOUBLE (single percentile)
+ * - SPECTATOR_PERCENTILE(column, ARRAY[p1, p2, ...]) -> DOUBLE ARRAY
(multiple percentiles)
+ * <p>
+ * Percentile values should be in the range [0, 100] (e.g., 95 for p95).
+ */
+public class SpectatorHistogramPercentileSqlAggregator implements SqlAggregator
+{
+ private static final SqlAggFunction FUNCTION_INSTANCE = new
SpectatorHistogramPercentileSqlAggFunction();
+ private static final String NAME = "SPECTATOR_PERCENTILE";
+
+ @Override
+ public SqlAggFunction calciteFunction()
+ {
+ return FUNCTION_INSTANCE;
+ }
+
+ @Nullable
+ @Override
+ public Aggregation toDruidAggregation(
+ final PlannerContext plannerContext,
+ final VirtualColumnRegistry virtualColumnRegistry,
+ final String name,
+ final AggregateCall aggregateCall,
+ final InputAccessor inputAccessor,
+ final List<Aggregation> existingAggregations,
+ final boolean finalizeAggregations
+ )
+ {
+ final DruidExpression input =
Aggregations.toDruidExpressionForNumericAggregator(
+ plannerContext,
+ inputAccessor.getInputRowSignature(),
+ inputAccessor.getField(aggregateCall.getArgList().get(0))
+ );
+ if (input == null) {
+ return null;
+ }
+
+ final RexNode percentileArg =
inputAccessor.getField(aggregateCall.getArgList().get(1));
+
+ // Check if percentile argument is an array or a single value
+ if (percentileArg.isA(SqlKind.ARRAY_VALUE_CONSTRUCTOR)) {
+ return handleArrayPercentiles(
+ virtualColumnRegistry,
+ name,
+ input,
+ percentileArg,
+ existingAggregations
+ );
+ } else if (percentileArg.isA(SqlKind.LITERAL)) {
+ return handleSinglePercentile(
+ virtualColumnRegistry,
+ name,
+ input,
+ percentileArg,
+ existingAggregations
+ );
+ }
+
+ // Cannot handle non-literal percentile arguments
+ return null;
+ }
+
+ private Aggregation handleSinglePercentile(
+ final VirtualColumnRegistry virtualColumnRegistry,
+ final String name,
+ final DruidExpression input,
+ final RexNode percentileArg,
+ final List<Aggregation> existingAggregations
+ )
+ {
+ final double percentile = ((Number)
RexLiteral.value(percentileArg)).doubleValue();
+
+ final String histogramName = StringUtils.format("%s:agg", name);
+
+ // Look for existing matching aggregatorFactory
+ final SpectatorHistogramAggregatorFactory existingFactory =
+ SpectatorHistogramSqlUtils.findMatchingAggregatorFactory(
+ virtualColumnRegistry,
+ input,
+ existingAggregations
+ );
+
+ if (existingFactory != null) {
+ return Aggregation.create(
+ ImmutableList.of(),
+ new SpectatorHistogramPercentilePostAggregator(
+ name,
+ new FieldAccessPostAggregator(
+ existingFactory.getName(),
+ existingFactory.getName()
+ ),
+ percentile
+ )
+ );
+ }
+
+ // No existing match found. Create a new one.
+ final SpectatorHistogramAggregatorFactory aggregatorFactory =
+ SpectatorHistogramSqlUtils.createAggregatorFactory(
+ virtualColumnRegistry,
+ input,
+ histogramName
+ );
+
+ return Aggregation.create(
+ ImmutableList.of(aggregatorFactory),
+ new SpectatorHistogramPercentilePostAggregator(
+ name,
+ new FieldAccessPostAggregator(histogramName, histogramName),
+ percentile
+ )
+ );
+ }
+
+ @Nullable
+ private Aggregation handleArrayPercentiles(
+ final VirtualColumnRegistry virtualColumnRegistry,
+ final String name,
+ final DruidExpression input,
+ final RexNode percentileArg,
+ final List<Aggregation> existingAggregations
+ )
+ {
+ // Extract array elements
+ final List<RexNode> arrayElements = ((RexCall)
percentileArg).getOperands();
+ final double[] percentiles = new double[arrayElements.size()];
+
+ for (int i = 0; i < arrayElements.size(); i++) {
+ RexNode element = arrayElements.get(i);
+ if (!element.isA(SqlKind.LITERAL)) {
+ return null; // All array elements must be literals
+ }
+ percentiles[i] = ((Number) RexLiteral.value(element)).doubleValue();
+ }
+
+ final String histogramName = StringUtils.format("%s:agg", name);
+
+ // Look for existing matching aggregatorFactory
+ final SpectatorHistogramAggregatorFactory existingFactory =
+ SpectatorHistogramSqlUtils.findMatchingAggregatorFactory(
+ virtualColumnRegistry,
+ input,
+ existingAggregations
+ );
+
+ if (existingFactory != null) {
+ return Aggregation.create(
+ ImmutableList.of(),
+ new SpectatorHistogramPercentilesPostAggregator(
+ name,
+ new FieldAccessPostAggregator(
+ existingFactory.getName(),
+ existingFactory.getName()
+ ),
+ percentiles
+ )
+ );
+ }
+
+ // No existing match found. Create a new one.
+ final SpectatorHistogramAggregatorFactory aggregatorFactory =
+ SpectatorHistogramSqlUtils.createAggregatorFactory(
+ virtualColumnRegistry,
+ input,
+ histogramName
+ );
+
+ return Aggregation.create(
+ ImmutableList.of(aggregatorFactory),
+ new SpectatorHistogramPercentilesPostAggregator(
+ name,
+ new FieldAccessPostAggregator(histogramName, histogramName),
+ percentiles
+ )
+ );
+ }
+
+ /**
+ * Return type inference that returns DOUBLE for single percentile value
+ * and DOUBLE ARRAY when an array of percentiles is provided.
+ */
+ static class SpectatorHistogramPercentileReturnTypeInference implements
SqlReturnTypeInference
+ {
+ @Override
+ public RelDataType inferReturnType(SqlOperatorBinding sqlOperatorBinding)
+ {
+ RelDataType secondArgType = sqlOperatorBinding.getOperandType(1);
+ if (secondArgType.getSqlTypeName() == SqlTypeName.ARRAY) {
+ // Return DOUBLE ARRAY when input is an array of percentiles
+ return sqlOperatorBinding.getTypeFactory().createArrayType(
+
sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.DOUBLE),
+ -1
+ );
+ }
+ // Return DOUBLE for single percentile value
+ return
sqlOperatorBinding.getTypeFactory().createSqlType(SqlTypeName.DOUBLE);
+ }
+ }
+
+ private static class SpectatorHistogramPercentileSqlAggFunction extends
SqlAggFunction
+ {
+ private static final SpectatorHistogramPercentileReturnTypeInference
RETURN_TYPE_INFERENCE =
+ new SpectatorHistogramPercentileReturnTypeInference();
+
+ SpectatorHistogramPercentileSqlAggFunction()
+ {
+ super(
+ NAME,
+ null,
+ SqlKind.OTHER_FUNCTION,
+ RETURN_TYPE_INFERENCE,
+ null,
+ OperandTypes.ANY_ANY,
+ SqlFunctionCategory.USER_DEFINED_FUNCTION,
+ false,
+ false,
+ Optionality.FORBIDDEN
+ );
+ }
+ }
+}
diff --git
a/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramSqlUtils.java
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramSqlUtils.java
new file mode 100644
index 00000000000..33c27f34990
--- /dev/null
+++
b/extensions-contrib/spectator-histogram/src/main/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramSqlUtils.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram.sql;
+
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.column.ColumnType;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramAggregatorFactory;
+import org.apache.druid.sql.calcite.aggregation.Aggregation;
+import org.apache.druid.sql.calcite.expression.DruidExpression;
+import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ * Utility methods shared by SpectatorHistogram SQL aggregators.
+ */
+public class SpectatorHistogramSqlUtils
+{
+ private SpectatorHistogramSqlUtils()
+ {
+ // Utility class
+ }
+
+ /**
+ * Finds an existing SpectatorHistogramAggregatorFactory that matches the
given input expression.
+ *
+ * @param virtualColumnRegistry the virtual column registry
+ * @param input the input DruidExpression
+ * @param existingAggregations list of existing aggregations to search
through
+ * @return matching factory if found, null otherwise
+ */
+ @Nullable
+ public static SpectatorHistogramAggregatorFactory
findMatchingAggregatorFactory(
+ final VirtualColumnRegistry virtualColumnRegistry,
+ final DruidExpression input,
+ final List<Aggregation> existingAggregations
+ )
+ {
+ for (final Aggregation existing : existingAggregations) {
+ for (AggregatorFactory factory : existing.getAggregatorFactories()) {
+ if (factory instanceof SpectatorHistogramAggregatorFactory) {
+ final SpectatorHistogramAggregatorFactory histogramFactory =
+ (SpectatorHistogramAggregatorFactory) factory;
+ if (inputMatches(virtualColumnRegistry, input, histogramFactory)) {
+ return histogramFactory;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Checks if the input expression matches the given aggregator factory.
+ */
+ private static boolean inputMatches(
+ final VirtualColumnRegistry virtualColumnRegistry,
+ final DruidExpression input,
+ final SpectatorHistogramAggregatorFactory factory
+ )
+ {
+ final DruidExpression virtualInput =
+
virtualColumnRegistry.findVirtualColumnExpressions(factory.requiredFields())
+ .stream()
+ .findFirst()
+ .orElse(null);
+
+ if (virtualInput == null) {
+ return input.isDirectColumnAccess() &&
input.getDirectColumn().equals(factory.getFieldName());
+ } else {
+ return virtualInput.equals(input);
+ }
+ }
+
+ /**
+ * Creates a new SpectatorHistogramAggregatorFactory for the given input
expression.
+ *
+ * @param virtualColumnRegistry the virtual column registry
+ * @param input the input DruidExpression
+ * @param histogramName the name for the new aggregator factory
+ * @return a new SpectatorHistogramAggregatorFactory
+ */
+ public static SpectatorHistogramAggregatorFactory createAggregatorFactory(
+ final VirtualColumnRegistry virtualColumnRegistry,
+ final DruidExpression input,
+ final String histogramName
+ )
+ {
+ if (input.isDirectColumnAccess()) {
+ return new SpectatorHistogramAggregatorFactory(
+ histogramName,
+ input.getDirectColumn()
+ );
+ } else {
+ String virtualColumnName =
virtualColumnRegistry.getOrCreateVirtualColumnForExpression(
+ input,
+ ColumnType.DOUBLE
+ );
+ return new SpectatorHistogramAggregatorFactory(histogramName,
virtualColumnName);
+ }
+ }
+}
diff --git
a/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorTest.java
b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorTest.java
index 15979b94376..2b402fb60ef 100644
---
a/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorTest.java
+++
b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/SpectatorHistogramAggregatorTest.java
@@ -884,6 +884,97 @@ public class SpectatorHistogramAggregatorTest extends
InitializedNullHandlingTes
Assert.assertNull("Row [5] should have null percentiles when histogram is
null", results.get(5).get(2));
}
+ @Test
+ public void testCountPostAggregator() throws Exception
+ {
+ Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
+ new
File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"histogram\",
\"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0,
+ Granularities.NONE,
+ 10,
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"groupBy\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"dimenions\": [],",
+ " \"aggregations\": [",
+ " {\"type\": \"spectatorHistogram\", \"name\":
\"merged_cost_histogram\", \"fieldName\": "
+ + "\"histogram\"}",
+ " ],",
+ " \"postAggregations\": [",
+ " {\"type\": \"countSpectatorHistogram\", \"name\": \"count\",
\"field\": {\"type\": \"fieldAccess\",\"fieldName\":
\"merged_cost_histogram\"}}",
+ " ],",
+ " \"intervals\":
[\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+ "}"
+ )
+ );
+
+ List<ResultRow> results = seq.toList();
+ Assert.assertEquals(1, results.size());
+ // The merged histogram has 9 total observations (1+1+3+3+1 from the
buckets)
+ Assert.assertEquals(9L, results.get(0).get(1));
+ }
+
+ @Test
+ public void testCountPostAggregatorWithNullSketch() throws Exception
+ {
+ Sequence<ResultRow> seq = helper.createIndexAndRunQueryOnSegment(
+ new
File(this.getClass().getClassLoader().getResource("input_data.tsv").getFile()),
+ INPUT_DATA_PARSE_SPEC,
+ String.join(
+ "\n",
+ "[",
+ " {\"type\": \"spectatorHistogram\", \"name\": \"histogram\",
\"fieldName\": \"cost\"}",
+ "]"
+ ),
+ 0,
+ Granularities.NONE,
+ 10,
+ String.join(
+ "\n",
+ "{",
+ " \"queryType\": \"groupBy\",",
+ " \"dataSource\": \"test_datasource\",",
+ " \"granularity\": \"ALL\",",
+ " \"dimensions\": [\"product\"],",
+ " \"aggregations\": [",
+ " {\"type\": \"spectatorHistogram\", \"name\":
\"merged_histogram\", \"fieldName\": "
+ + "\"histogram\"}",
+ " ],",
+ " \"postAggregations\": [",
+ " {\"type\": \"countSpectatorHistogram\", \"name\": \"count\",
\"field\": {\"type\": \"fieldAccess\",\"fieldName\": \"merged_histogram\"}}",
+ " ],",
+ " \"intervals\":
[\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
+ "}"
+ )
+ );
+
+ List<ResultRow> results = seq.toList();
+ Assert.assertEquals(6, results.size());
+
+ // First three rows should have valid histograms and count values
+ // Product A: 1 observation
+ Assert.assertEquals(1L, results.get(0).get(2));
+ // Product B: 6 observations (1+3+2 from buckets at indices 30, 40, 50)
+ Assert.assertEquals(6L, results.get(1).get(2));
+ // Product C: 2 observations (1+1 from buckets at indices 50, 20000)
+ Assert.assertEquals(2L, results.get(2).get(2));
+
+ // Last three rows have null histograms, so count should also be null
+ Assert.assertNull("Row [3] should have null count when histogram is null",
results.get(3).get(2));
+ Assert.assertNull("Row [4] should have null count when histogram is null",
results.get(4).get(2));
+ Assert.assertNull("Row [5] should have null count when histogram is null",
results.get(5).get(2));
+ }
+
private static void assertResultsMatch(List<ResultRow> results, int rowNum,
String expectedProduct)
{
ResultRow row = results.get(rowNum);
diff --git
a/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramSqlAggregatorTest.java
b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramSqlAggregatorTest.java
new file mode 100644
index 00000000000..63c8f35e8cc
--- /dev/null
+++
b/extensions-contrib/spectator-histogram/src/test/java/org/apache/druid/spectator/histogram/sql/SpectatorHistogramSqlAggregatorTest.java
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.spectator.histogram.sql;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramAggregatorFactory;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramCountPostAggregator;
+import org.apache.druid.spectator.histogram.SpectatorHistogramModule;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramPercentilePostAggregator;
+import
org.apache.druid.spectator.histogram.SpectatorHistogramPercentilesPostAggregator;
+import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
+import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
+import org.apache.druid.sql.calcite.TempDirProducer;
+import org.apache.druid.sql.calcite.filtration.Filtration;
+import org.apache.druid.sql.calcite.util.CalciteTests;
+import org.apache.druid.sql.calcite.util.DruidModuleCollection;
+import
org.apache.druid.sql.calcite.util.SqlTestFramework.StandardComponentSupplier;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.LinearShardSpec;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
[email protected](SpectatorHistogramSqlAggregatorTest.SpectatorHistogramComponentSupplier.class)
+public class SpectatorHistogramSqlAggregatorTest extends BaseCalciteQueryTest
+{
+ private static final List<ImmutableMap<String, Object>> RAW_ROWS =
ImmutableList.of(
+ ImmutableMap.of("t", "2000-01-01", "dim1", "a", "metric", 100L),
+ ImmutableMap.of("t", "2000-01-02", "dim1", "b", "metric", 200L),
+ ImmutableMap.of("t", "2000-01-03", "dim1", "c", "metric", 300L),
+ ImmutableMap.of("t", "2000-01-04", "dim1", "d", "metric", 400L),
+ ImmutableMap.of("t", "2000-01-05", "dim1", "e", "metric", 500L),
+ ImmutableMap.of("t", "2000-01-06", "dim1", "f", "metric", 600L)
+ );
+
+ private static final MapInputRowParser PARSER = new MapInputRowParser(
+ new TimeAndDimsParseSpec(
+ new TimestampSpec("t", "auto", null),
+ DimensionsSpec.builder()
+ .setDimensions(ImmutableList.of(
+ new StringDimensionSchema("dim1"),
+ new LongDimensionSchema("metric")
+ ))
+ .setDimensionExclusions(ImmutableList.of("t"))
+ .build()
+ )
+ );
+
+ private static final List<InputRow> ROWS = RAW_ROWS.stream()
+ .map(raw ->
PARSER.parseBatch(raw).get(0))
+
.collect(Collectors.toList());
+
+ protected static class SpectatorHistogramComponentSupplier extends
StandardComponentSupplier
+ {
+ public SpectatorHistogramComponentSupplier(TempDirProducer
tempFolderProducer)
+ {
+ super(tempFolderProducer);
+ }
+
+ @Override
+ public DruidModule getCoreModule()
+ {
+ return DruidModuleCollection.of(super.getCoreModule(), new
SpectatorHistogramModule());
+ }
+
+ @Override
+ public SpecificSegmentsQuerySegmentWalker
addSegmentsToWalker(SpecificSegmentsQuerySegmentWalker walker)
+ {
+ SpectatorHistogramModule.registerSerde();
+
+ final QueryableIndex index =
+ IndexBuilder.create(CalciteTests.getJsonMapper())
+ .tmpDir(tempDirProducer.newTempFolder())
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ new IncrementalIndexSchema.Builder()
+ .withDimensionsSpec(
+ DimensionsSpec.builder()
+
.setDimensions(ImmutableList.of(
+ new
StringDimensionSchema("dim1"),
+ new
LongDimensionSchema("metric")
+ ))
+ .build()
+ )
+ .withMetrics(
+ new CountAggregatorFactory("cnt"),
+ new SpectatorHistogramAggregatorFactory(
+ "histogram_metric",
+ "metric"
+ )
+ )
+ .withRollup(false)
+ .build()
+ )
+ .rows(ROWS)
+ .buildMMappedIndex();
+
+ return walker.add(
+ DataSegment.builder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .interval(index.getDataInterval())
+ .version("1")
+ .shardSpec(new LinearShardSpec(0))
+ .size(0)
+ .build(),
+ index
+ );
+ }
+ }
+
+ @Test
+ public void testSpectatorPercentileOnPreAggregatedHistogram()
+ {
+ testQuery(
+ "SELECT SPECTATOR_PERCENTILE(histogram_metric, 50) FROM foo",
+ Collections.singletonList(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .aggregators(ImmutableList.of(
+ new SpectatorHistogramAggregatorFactory("a0:agg",
"histogram_metric")
+ ))
+ .postAggregators(
+ new SpectatorHistogramPercentilePostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg"),
+ 50.0
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ // Histogram bucket approximation for p50 of [100, 200, 300, 400,
500, 600]
+ new Object[]{341.0}
+ )
+ );
+ }
+
+ @Test
+ public void testSpectatorCountOnPreAggregatedHistogram()
+ {
+ testQuery(
+ "SELECT SPECTATOR_COUNT(histogram_metric) FROM foo",
+ Collections.singletonList(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .aggregators(ImmutableList.of(
+ new SpectatorHistogramAggregatorFactory("a0:agg",
"histogram_metric")
+ ))
+ .postAggregators(
+ new SpectatorHistogramCountPostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg")
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(new Object[]{6L})
+ );
+ }
+
+ @Test
+ public void testSpectatorPercentileOnLongColumn()
+ {
+ // This creates a histogram from the raw long values and then computes
percentile
+ testQuery(
+ "SELECT SPECTATOR_PERCENTILE(metric, 50) FROM foo",
+ Collections.singletonList(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .aggregators(ImmutableList.of(
+ new SpectatorHistogramAggregatorFactory("a0:agg",
"metric")
+ ))
+ .postAggregators(
+ new SpectatorHistogramPercentilePostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg"),
+ 50.0
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ // Histogram bucket approximation for p50 of [100, 200, 300, 400,
500, 600]
+ new Object[]{341.0}
+ )
+ );
+ }
+
+ @Test
+ public void testSpectatorCountOnLongColumn()
+ {
+ // This creates a histogram from the raw values and then counts
+ testQuery(
+ "SELECT SPECTATOR_COUNT(metric) FROM foo",
+ Collections.singletonList(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .aggregators(ImmutableList.of(
+ new SpectatorHistogramAggregatorFactory("a0:agg",
"metric")
+ ))
+ .postAggregators(
+ new SpectatorHistogramCountPostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg")
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(new Object[]{6L})
+ );
+ }
+
+ @Test
+ public void testSpectatorCountOnLongColumnGroupBy()
+ {
+ testQuery(
+ "SELECT dim1, SPECTATOR_COUNT(metric) FROM foo GROUP BY dim1",
+ Collections.singletonList(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(new DefaultDimensionSpec("dim1", "d0",
ColumnType.STRING))
+ .setAggregatorSpecs(
+ new SpectatorHistogramAggregatorFactory("a0:agg",
"metric")
+ )
+ .setPostAggregatorSpecs(
+ new SpectatorHistogramCountPostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg",
"a0:agg")
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"a", 1L},
+ new Object[]{"b", 1L},
+ new Object[]{"c", 1L},
+ new Object[]{"d", 1L},
+ new Object[]{"e", 1L},
+ new Object[]{"f", 1L}
+ )
+ );
+ }
+
+ @Test
+ public void testSpectatorCountGroupBy()
+ {
+ testQuery(
+ "SELECT dim1, SPECTATOR_COUNT(histogram_metric) FROM foo GROUP BY
dim1",
+ Collections.singletonList(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(new DefaultDimensionSpec("dim1", "d0",
ColumnType.STRING))
+ .setAggregatorSpecs(
+ new SpectatorHistogramAggregatorFactory("a0:agg",
"histogram_metric")
+ )
+ .setPostAggregatorSpecs(
+ new SpectatorHistogramCountPostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg",
"a0:agg")
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ new Object[]{"a", 1L},
+ new Object[]{"b", 1L},
+ new Object[]{"c", 1L},
+ new Object[]{"d", 1L},
+ new Object[]{"e", 1L},
+ new Object[]{"f", 1L}
+ )
+ );
+ }
+
+ @Test
+ public void testMultipleAggregationsOnSameColumn()
+ {
+ // Test that multiple aggregations on the same column share a single
aggregator
+ testQuery(
+ "SELECT SPECTATOR_COUNT(histogram_metric),
SPECTATOR_PERCENTILE(histogram_metric, 50), "
+ + "SPECTATOR_PERCENTILE(histogram_metric, 99.99) FROM foo",
+ Collections.singletonList(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .aggregators(ImmutableList.of(
+ new SpectatorHistogramAggregatorFactory("a0:agg",
"histogram_metric")
+ ))
+ .postAggregators(
+ new SpectatorHistogramCountPostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg")
+ ),
+ new SpectatorHistogramPercentilePostAggregator(
+ "a1",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg"),
+ 50.0
+ ),
+ new SpectatorHistogramPercentilePostAggregator(
+ "a2",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg"),
+ 99.99
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ // p50 = 341.0, p99.99 = 680.949 (interpolated value near max)
+ new Object[]{6L, 341.0, 680.949}
+ )
+ );
+ }
+
+ @Test
+ public void testSpectatorPercentileGroupBy()
+ {
+ testQuery(
+ "SELECT dim1, SPECTATOR_PERCENTILE(histogram_metric, 50) FROM foo
GROUP BY dim1",
+ Collections.singletonList(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(new DefaultDimensionSpec("dim1", "d0",
ColumnType.STRING))
+ .setAggregatorSpecs(
+ new SpectatorHistogramAggregatorFactory("a0:agg",
"histogram_metric")
+ )
+ .setPostAggregatorSpecs(
+ new SpectatorHistogramPercentilePostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg",
"a0:agg"),
+ 50.0
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ // Each row has a single value, so p50 returns the middle of that
bucket
+ // Values depend on Spectator histogram bucket boundaries
+ new Object[]{"a", 95.5},
+ new Object[]{"b", 200.5},
+ new Object[]{"c", 298.5},
+ new Object[]{"d", 383.5},
+ new Object[]{"e", 468.5},
+ new Object[]{"f", 638.5}
+ )
+ );
+ }
+
+ @Test
+ public void testSpectatorPercentileWithArray()
+ {
+ testQuery(
+ "SELECT SPECTATOR_PERCENTILE(histogram_metric, ARRAY[25, 50, 75, 99])
FROM foo",
+ Collections.singletonList(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .aggregators(ImmutableList.of(
+ new SpectatorHistogramAggregatorFactory("a0:agg",
"histogram_metric")
+ ))
+ .postAggregators(
+ new SpectatorHistogramPercentilesPostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg"),
+ new double[]{25.0, 50.0, 75.0, 99.0}
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ // Returns an array of percentile values
+ new Object[]{"[200.5,341.0,468.5,675.9]"}
+ )
+ );
+ }
+
+ @Test
+ public void testSpectatorPercentileWithArrayGroupBy()
+ {
+ testQuery(
+ "SELECT dim1, SPECTATOR_PERCENTILE(histogram_metric, ARRAY[50, 99])
FROM foo GROUP BY dim1",
+ Collections.singletonList(
+ GroupByQuery.builder()
+ .setDataSource(CalciteTests.DATASOURCE1)
+ .setInterval(querySegmentSpec(Filtration.eternity()))
+ .setGranularity(Granularities.ALL)
+ .setDimensions(new DefaultDimensionSpec("dim1", "d0",
ColumnType.STRING))
+ .setAggregatorSpecs(
+ new SpectatorHistogramAggregatorFactory("a0:agg",
"histogram_metric")
+ )
+ .setPostAggregatorSpecs(
+ new SpectatorHistogramPercentilesPostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg",
"a0:agg"),
+ new double[]{50.0, 99.0}
+ )
+ )
+ .setContext(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(
+ // Returns array of [p50, p99] for each group
+ new Object[]{"a", "[95.5,105.78999999999999]"},
+ new Object[]{"b", "[200.5,210.79]"},
+ new Object[]{"c", "[298.5,340.15]"},
+ new Object[]{"d", "[383.5,425.15]"},
+ new Object[]{"e", "[468.5,510.15]"},
+ new Object[]{"f", "[638.5,680.15]"}
+ )
+ );
+ }
+
+ @Test
+ public void testSpectatorFunctionsOnEmptyHistogram()
+ {
+ // Use a filter that matches no rows to get an empty/null histogram
aggregation result
+ // Both COUNT and PERCENTILE return null for empty histogram (same as
native behavior)
+ testQuery(
+ "SELECT SPECTATOR_COUNT(histogram_metric),
SPECTATOR_PERCENTILE(histogram_metric, 50) "
+ + "FROM foo WHERE dim1 = 'nonexistent'",
+ Collections.singletonList(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .filters(equality("dim1", "nonexistent", ColumnType.STRING))
+ .aggregators(ImmutableList.of(
+ new SpectatorHistogramAggregatorFactory("a0:agg",
"histogram_metric")
+ ))
+ .postAggregators(
+ new SpectatorHistogramCountPostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg")
+ ),
+ new SpectatorHistogramPercentilePostAggregator(
+ "a1",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg"),
+ 50.0
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(new Object[]{null, null})
+ );
+ }
+
+ @Test
+ public void testSpectatorFunctionsOnNullHistogram()
+ {
+ // Both COUNT and PERCENTILE return null for null histogram (same as
native behavior)
+ testQuery(
+ "SELECT SPECTATOR_COUNT(null), SPECTATOR_PERCENTILE(null, 99.9),
SPECTATOR_PERCENTILE(null, ARRAY[90, 99.9]) FROM foo",
+ Collections.singletonList(
+ Druids.newTimeseriesQueryBuilder()
+ .dataSource(CalciteTests.DATASOURCE1)
+ .intervals(new
MultipleIntervalSegmentSpec(ImmutableList.of(Filtration.eternity())))
+ .granularity(Granularities.ALL)
+ .virtualColumns(expressionVirtualColumn("v0", "null",
ColumnType.DOUBLE))
+ .aggregators(ImmutableList.of(
+ new SpectatorHistogramAggregatorFactory("a0:agg", "v0")
+ ))
+ .postAggregators(
+ new SpectatorHistogramCountPostAggregator(
+ "a0",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg")
+ ),
+ new SpectatorHistogramPercentilePostAggregator(
+ "a1",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg"),
+ 99.9
+ ),
+ new SpectatorHistogramPercentilesPostAggregator(
+ "a2",
+ new FieldAccessPostAggregator("a0:agg", "a0:agg"),
+ new double[]{90.0, 99.9}
+ )
+ )
+ .context(QUERY_CONTEXT_DEFAULT)
+ .build()
+ ),
+ ImmutableList.of(new Object[]{null, null, null})
+ );
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java
b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java
index 44203e92702..19d25508bc8 100644
---
a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java
+++
b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java
@@ -68,6 +68,7 @@ public class PostAggregatorIds
public static final byte KLL_FLOATS_SKETCH_TO_STRING_CACHE_TYPE_ID = 44;
public static final byte SPECTATOR_HISTOGRAM_SKETCH_PERCENTILE_CACHE_TYPE_ID
= 45;
public static final byte
SPECTATOR_HISTOGRAM_SKETCH_PERCENTILES_CACHE_TYPE_ID = 46;
+ public static final byte SPECTATOR_HISTOGRAM_SKETCH_COUNT_CACHE_TYPE_ID = 47;
public static final byte DDSKETCH_QUANTILES_TYPE_ID = 51;
public static final byte DDSKETCH_QUANTILE_TYPE_ID = 52;
public static final byte BITMAP64_EXACT_COUNT_TYPE_ID = 53;
diff --git a/website/.spelling b/website/.spelling
index a6e7cf57388..3486f8a8d88 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -2445,12 +2445,15 @@ ZGC
- ../docs/development/extensions-contrib/spectator-histogram.md
SpectatorHistogram
+SPECTATOR_PERCENTILE
+SPECTATOR_COUNT
PercentileBuckets
spectatorHistogram
spectatorHistogramTimer
spectatorHistogramDistribution
percentileSpectatorHistogram
percentilesSpectatorHistogram
+countSpectatorHistogram
- ../docs/development/extensions-contrib/ddsketch-quantiles.md
quantilesFromDDSketch
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]