This is an automated email from the ASF dual-hosted git repository.
epugh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new f6748620743 SOLR-18221 Add support for percentile in rollup for
streaming expressions (#4504)
f6748620743 is described below
commit f6748620743693a652182725bddfeac3f49202f5
Author: Khush Jain <[email protected]>
AuthorDate: Wed Jun 10 19:20:52 2026 -0400
SOLR-18221 Add support for percentile in rollup for streaming expressions
(#4504)
Co-authored-by: Eric Pugh <[email protected]>
---
.../SOLR-18221-support-percentile-in-rollup.yml | 7 ++
solr/core/gradle.lockfile | 2 +-
.../pages/stream-decorator-reference.adoc | 6 +-
solr/solrj-streaming/build.gradle | 1 +
solr/solrj-streaming/gradle.lockfile | 2 +-
.../solrj/io/stream/metrics/PercentileMetric.java | 68 +++++++++----
.../solr/client/solrj/io/stream/StreamingTest.java | 110 +++++++++++++++++++++
7 files changed, 171 insertions(+), 25 deletions(-)
diff --git a/changelog/unreleased/SOLR-18221-support-percentile-in-rollup.yml
b/changelog/unreleased/SOLR-18221-support-percentile-in-rollup.yml
new file mode 100644
index 00000000000..2e715d03414
--- /dev/null
+++ b/changelog/unreleased/SOLR-18221-support-percentile-in-rollup.yml
@@ -0,0 +1,7 @@
+title: "Support 'percentile' (per) metric in rollup for streaming expressions"
+type: added
+authors:
+ - name: khushjain
+links:
+ - name: SOLR-18221
+ url: https://issues.apache.org/jira/browse/SOLR-18221
diff --git a/solr/core/gradle.lockfile b/solr/core/gradle.lockfile
index b02dbf6a10a..2acb5f859dd 100644
--- a/solr/core/gradle.lockfile
+++ b/solr/core/gradle.lockfile
@@ -34,7 +34,7 @@ com.ibm.icu:icu4j:77.1=jarValidation,testRuntimeClasspath
com.j256.simplemagic:simplemagic:1.17=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
com.jayway.jsonpath:json-path:2.9.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
com.microsoft.onnxruntime:onnxruntime:1.26.0=jarValidation,testRuntimeClasspath
-com.tdunning:t-digest:3.3=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
+com.tdunning:t-digest:3.3=apiHelper,compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
commons-cli:commons-cli:1.11.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
commons-codec:commons-codec:1.21.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
commons-io:commons-io:2.21.0=apiHelper,compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
diff --git
a/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc
b/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc
index 5abd5efad11..9cbc8e440be 100644
---
a/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc
+++
b/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc
@@ -1448,7 +1448,7 @@ For faster aggregation over low to moderate cardinality
fields, the `facet` func
* `StreamExpression` (Mandatory)
* `over`: (Mandatory) A list of fields to group by.
* `metrics`: (Mandatory) The list of metrics to compute.
-Currently supported metrics are `sum(col)`, `avg(col)`, `min(col)`,
`max(col)`, `count(*)`, `missing(col)`, `countDist(col)`.
+Currently supported metrics are `sum(col)`, `avg(col)`, `min(col)`,
`max(col)`, `count(*)`, `missing(col)`, `countDist(col)`, `per(col,
percentile)`.
=== rollup Syntax
@@ -1467,7 +1467,9 @@ rollup(
avg(a_f),
count(*),
missing(a_i),
- countDist(a_i)
+ countDist(a_i),
+ per(a_i, 50),
+ per(a_f, 75)
)
----
diff --git a/solr/solrj-streaming/build.gradle
b/solr/solrj-streaming/build.gradle
index b04d24959c3..710bb45ed61 100644
--- a/solr/solrj-streaming/build.gradle
+++ b/solr/solrj-streaming/build.gradle
@@ -25,6 +25,7 @@ dependencies {
// declare dependencies we use even though already declared by solrj-core
implementation libs.slf4j.api
implementation libs.apache.commons.math3
+ implementation libs.tdunning.tdigest
testImplementation project(':solr:test-framework')
testImplementation project(':solr:core')
diff --git a/solr/solrj-streaming/gradle.lockfile
b/solr/solrj-streaming/gradle.lockfile
index d6cc9f7eee4..ecfdb32e82e 100644
--- a/solr/solrj-streaming/gradle.lockfile
+++ b/solr/solrj-streaming/gradle.lockfile
@@ -30,7 +30,7 @@
com.google.protobuf:protobuf-java:3.25.8=annotationProcessor,errorprone,jarValid
com.googlecode.json-simple:json-simple:1.1.1=jarValidation,testRuntimeClasspath
com.j256.simplemagic:simplemagic:1.17=jarValidation,testRuntimeClasspath
com.jayway.jsonpath:json-path:2.9.0=jarValidation,testRuntimeClasspath
-com.tdunning:t-digest:3.3=jarValidation,testRuntimeClasspath
+com.tdunning:t-digest:3.3=compileClasspath,jarValidation,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
commons-cli:commons-cli:1.11.0=jarValidation,testRuntimeClasspath
commons-codec:commons-codec:1.21.0=jarValidation,testRuntimeClasspath
commons-io:commons-io:2.21.0=jarValidation,testCompileClasspath,testRuntimeClasspath
diff --git
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/PercentileMetric.java
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/PercentileMetric.java
index c4784cd9276..f868315738b 100644
---
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/PercentileMetric.java
+++
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/PercentileMetric.java
@@ -16,6 +16,7 @@
*/
package org.apache.solr.client.solrj.io.stream.metrics;
+import com.tdunning.math.stats.AVLTreeDigest;
import java.io.IOException;
import java.util.Locale;
import org.apache.solr.client.solrj.io.Tuple;
@@ -24,27 +25,27 @@ import
org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
public class PercentileMetric extends Metric {
- private long longMax = -Long.MIN_VALUE;
- private double doubleMax = -Double.MAX_VALUE;
- private String columnName;
- public PercentileMetric(String columnName, int percentile) {
+ private static final int COMPRESSION = 100;
+
+ private String columnName;
+ private double percentile;
+ private AVLTreeDigest digest;
+ public PercentileMetric(String columnName, double percentile) {
init("per", columnName, percentile);
}
public PercentileMetric(StreamExpression expression, StreamFactory factory)
throws IOException {
- // grab all parameters out
String functionName = expression.getFunctionName();
String columnName = factory.getValueOperand(expression, 0);
- int percentile = Integer.parseInt(factory.getValueOperand(expression, 1));
+ String percentileStr = factory.getValueOperand(expression, 1);
- // validate expression contains only what we want.
if (null == columnName) {
throw new IOException(
String.format(
Locale.ROOT,
- "Invalid expression %s - expected %s(columnName)",
+ "Invalid expression %s - expected %s(columnName, percentile)",
expression,
functionName));
}
@@ -53,39 +54,64 @@ public class PercentileMetric extends Metric {
String.format(Locale.ROOT, "Invalid expression %s - unknown operands
found", expression));
}
+ double percentile = Double.parseDouble(percentileStr);
init(functionName, columnName, percentile);
}
- private void init(String functionName, String columnName, int percentile) {
+ private void init(String functionName, String columnName, double percentile)
{
+ if (percentile < 0 || percentile > 100) {
+ throw new IllegalArgumentException("percentile must be between 0 and
100, got " + percentile);
+ }
this.columnName = columnName;
+ this.percentile = percentile;
+ this.digest = new AVLTreeDigest(COMPRESSION);
setFunctionName(functionName);
- setIdentifier(functionName, "(", columnName, "," + percentile, ")");
+ setIdentifier(functionName, "(", columnName, "," +
formatPercentile(percentile), ")");
}
- @Override
- public Number getValue() {
- if (longMax == Long.MIN_VALUE) {
- return doubleMax;
- } else {
- return longMax;
+ private static String formatPercentile(double percentile) {
+ if (percentile == Math.floor(percentile) &&
!Double.isInfinite(percentile)) {
+ return Integer.toString((int) percentile);
}
+ return String.valueOf(percentile);
}
@Override
- public String[] getColumns() {
- return new String[] {columnName};
+ public void update(Tuple tuple) {
+ Object o = tuple.get(columnName);
+ if (o instanceof Double d) {
+ digest.add(d);
+ } else if (o instanceof Float f) {
+ digest.add(f.doubleValue());
+ } else if (o instanceof Integer i) {
+ digest.add(i.doubleValue());
+ } else if (o instanceof Long l) {
+ digest.add(l.doubleValue());
+ }
}
@Override
- public void update(Tuple tuple) {}
+ public Number getValue() {
+ if (digest.size() == 0) {
+ return null;
+ }
+ return digest.quantile(percentile / 100.0);
+ }
@Override
public Metric newInstance() {
- return new MaxMetric(columnName);
+ return new PercentileMetric(columnName, percentile);
+ }
+
+ @Override
+ public String[] getColumns() {
+ return new String[] {columnName};
}
@Override
public StreamExpressionParameter toExpression(StreamFactory factory) throws
IOException {
- return new StreamExpression(getFunctionName()).withParameter(columnName);
+ return new StreamExpression(getFunctionName())
+ .withParameter(columnName)
+ .withParameter(formatPercentile(percentile));
}
}
diff --git
a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index c838af76ca4..2ce5c3d7afd 100644
---
a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++
b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -39,6 +39,7 @@ import org.apache.solr.client.solrj.io.comp.FieldComparator;
import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
import org.apache.solr.client.solrj.io.ops.GroupOperation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
@@ -49,6 +50,7 @@ import
org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
import org.apache.solr.client.solrj.io.stream.metrics.Metric;
import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
import org.apache.solr.client.solrj.io.stream.metrics.MissingMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.PercentileMetric;
import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -1855,6 +1857,114 @@ public class StreamingTest extends SolrCloudTestCase {
}
}
+ @Test
+ public void testPercentileRollupStream() throws Exception {
+
+ helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+ StreamContext streamContext = new StreamContext();
+ SolrClientCache solrClientCache = new SolrClientCache();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ try {
+ // Test percentile metrics in RollupStream
+ SolrParams sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f", "sort",
"a_s asc");
+ CloudSolrStream stream = new CloudSolrStream(solrConnection,
COLLECTIONORALIAS, sParamsA);
+
+ Bucket[] buckets = {new Bucket("a_s")};
+
+ Metric[] metrics = {
+ new PercentileMetric("a_i", 50),
+ new PercentileMetric("a_f", 50),
+ new PercentileMetric("a_i", 0),
+ new PercentileMetric("a_i", 100),
+ new PercentileMetric("a_i", 99.9),
+ new MinMetric("a_i"),
+ new MaxMetric("a_i"),
+ };
+
+ RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
+ rollupStream.setStreamContext(streamContext);
+ List<Tuple> tuples = getTuples(rollupStream);
+
+ assertEquals(3, tuples.size());
+
+ // hello0: a_i = [0, 1, 2, 14], a_f = [1, 2, 5, 10]
+ Tuple tuple = tuples.get(0);
+ assertEquals("hello0", tuple.getString("a_s"));
+ Double peri50 = tuple.getDouble("per(a_i,50)");
+ Double perf50 = tuple.getDouble("per(a_f,50)");
+ Double peri0 = tuple.getDouble("per(a_i,0)");
+ Double peri100 = tuple.getDouble("per(a_i,100)");
+ Double peri999 = tuple.getDouble("per(a_i,99.9)");
+ Double mini = tuple.getDouble("min(a_i)");
+ Double maxi = tuple.getDouble("max(a_i)");
+
+ assertNotNull(peri50);
+ assertNotNull(perf50);
+ // 0th percentile should approximate min
+ assertEquals(mini, peri0, 0.5);
+ // 100th percentile should approximate max
+ assertEquals(maxi, peri100, 0.5);
+ // 99.9th percentile should be close to max
+ assertNotNull(peri999);
+
+ // hello3: a_i = [3, 10, 12, 13], a_f = [3, 6, 8, 9]
+ tuple = tuples.get(1);
+ assertEquals("hello3", tuple.getString("a_s"));
+ peri50 = tuple.getDouble("per(a_i,50)");
+ perf50 = tuple.getDouble("per(a_f,50)");
+ peri0 = tuple.getDouble("per(a_i,0)");
+ peri100 = tuple.getDouble("per(a_i,100)");
+ mini = tuple.getDouble("min(a_i)");
+ maxi = tuple.getDouble("max(a_i)");
+
+ assertNotNull(peri50);
+ assertNotNull(perf50);
+ assertEquals(mini, peri0, 0.5);
+ assertEquals(maxi, peri100, 0.5);
+
+ // hello4: a_i = [4, 11], a_f = [4, 7]
+ tuple = tuples.get(2);
+ assertEquals("hello4", tuple.getString("a_s"));
+ peri50 = tuple.getDouble("per(a_i,50)");
+ perf50 = tuple.getDouble("per(a_f,50)");
+ peri0 = tuple.getDouble("per(a_i,0)");
+ peri100 = tuple.getDouble("per(a_i,100)");
+ mini = tuple.getDouble("min(a_i)");
+ maxi = tuple.getDouble("max(a_i)");
+
+ assertNotNull(peri50);
+ assertNotNull(perf50);
+ assertEquals(mini, peri0, 0.5);
+ assertEquals(maxi, peri100, 0.5);
+
+ // Test toExpression round-trip
+ PercentileMetric pm = new PercentileMetric("a_i", 50);
+ StreamExpressionParameter expr = pm.toExpression(streamFactory);
+ assertEquals("per(a_i,50)", expr.toString());
+
+ pm = new PercentileMetric("a_i", 99.9);
+ expr = pm.toExpression(streamFactory);
+ assertEquals("per(a_i,99.9)", expr.toString());
+
+ // Test HashRollupStream with percentile
+ stream = new CloudSolrStream(solrConnection, COLLECTIONORALIAS,
sParamsA);
+ Metric[] hashMetrics = {new PercentileMetric("a_i", 50), new
PercentileMetric("a_f", 50)};
+
+ HashRollupStream hashRollupStream = new HashRollupStream(stream,
buckets, hashMetrics);
+ hashRollupStream.setStreamContext(streamContext);
+ tuples = getTuples(hashRollupStream);
+
+ assertEquals(3, tuples.size());
+ for (Tuple t : tuples) {
+ assertNotNull(t.getDouble("per(a_i,50)"));
+ assertNotNull(t.getDouble("per(a_f,50)"));
+ }
+ } finally {
+ solrClientCache.close();
+ }
+ }
+
@Test
public void testDaemonTopicStream() throws Exception {
Assume.assumeTrue(!useAlias);