This is an automated email from the ASF dual-hosted git repository.

epugh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new f6748620743 SOLR-18221 Add support for percentile in rollup for 
streaming expressions (#4504)
f6748620743 is described below

commit f6748620743693a652182725bddfeac3f49202f5
Author: Khush Jain <[email protected]>
AuthorDate: Wed Jun 10 19:20:52 2026 -0400

    SOLR-18221 Add support for percentile in rollup for streaming expressions 
(#4504)
    
    Co-authored-by: Eric Pugh <[email protected]>
---
 .../SOLR-18221-support-percentile-in-rollup.yml    |   7 ++
 solr/core/gradle.lockfile                          |   2 +-
 .../pages/stream-decorator-reference.adoc          |   6 +-
 solr/solrj-streaming/build.gradle                  |   1 +
 solr/solrj-streaming/gradle.lockfile               |   2 +-
 .../solrj/io/stream/metrics/PercentileMetric.java  |  68 +++++++++----
 .../solr/client/solrj/io/stream/StreamingTest.java | 110 +++++++++++++++++++++
 7 files changed, 171 insertions(+), 25 deletions(-)

diff --git a/changelog/unreleased/SOLR-18221-support-percentile-in-rollup.yml 
b/changelog/unreleased/SOLR-18221-support-percentile-in-rollup.yml
new file mode 100644
index 00000000000..2e715d03414
--- /dev/null
+++ b/changelog/unreleased/SOLR-18221-support-percentile-in-rollup.yml
@@ -0,0 +1,7 @@
+title: "Support 'percentile' (per) metric in rollup for streaming expressions"
+type: added
+authors:
+  - name: khushjain
+links:
+  - name: SOLR-18221
+    url: https://issues.apache.org/jira/browse/SOLR-18221
diff --git a/solr/core/gradle.lockfile b/solr/core/gradle.lockfile
index b02dbf6a10a..2acb5f859dd 100644
--- a/solr/core/gradle.lockfile
+++ b/solr/core/gradle.lockfile
@@ -34,7 +34,7 @@ com.ibm.icu:icu4j:77.1=jarValidation,testRuntimeClasspath
 
com.j256.simplemagic:simplemagic:1.17=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
 
com.jayway.jsonpath:json-path:2.9.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
 com.microsoft.onnxruntime:onnxruntime:1.26.0=jarValidation,testRuntimeClasspath
-com.tdunning:t-digest:3.3=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
+com.tdunning:t-digest:3.3=apiHelper,compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
 
commons-cli:commons-cli:1.11.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
 
commons-codec:commons-codec:1.21.0=compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
 
commons-io:commons-io:2.21.0=apiHelper,compileClasspath,jarValidation,runtimeClasspath,runtimeLibs,testCompileClasspath,testRuntimeClasspath
diff --git 
a/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc 
b/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc
index 5abd5efad11..9cbc8e440be 100644
--- 
a/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc
+++ 
b/solr/solr-ref-guide/modules/query-guide/pages/stream-decorator-reference.adoc
@@ -1448,7 +1448,7 @@ For faster aggregation over low to moderate cardinality 
fields, the `facet` func
 * `StreamExpression` (Mandatory)
 * `over`: (Mandatory) A list of fields to group by.
 * `metrics`: (Mandatory) The list of metrics to compute.
-Currently supported metrics are `sum(col)`, `avg(col)`, `min(col)`, 
`max(col)`, `count(*)`, `missing(col)`, `countDist(col)`.
+Currently supported metrics are `sum(col)`, `avg(col)`, `min(col)`, 
`max(col)`, `count(*)`, `missing(col)`, `countDist(col)`, `per(col, 
percentile)`.
 
 === rollup Syntax
 
@@ -1467,7 +1467,9 @@ rollup(
    avg(a_f),
    count(*),
    missing(a_i),
-   countDist(a_i)
+   countDist(a_i),
+   per(a_i, 50),
+   per(a_f, 75)
 )
 ----
 
diff --git a/solr/solrj-streaming/build.gradle 
b/solr/solrj-streaming/build.gradle
index b04d24959c3..710bb45ed61 100644
--- a/solr/solrj-streaming/build.gradle
+++ b/solr/solrj-streaming/build.gradle
@@ -25,6 +25,7 @@ dependencies {
   // declare dependencies we use even though already declared by solrj-core
   implementation libs.slf4j.api
   implementation libs.apache.commons.math3
+  implementation libs.tdunning.tdigest
 
   testImplementation project(':solr:test-framework')
   testImplementation project(':solr:core')
diff --git a/solr/solrj-streaming/gradle.lockfile 
b/solr/solrj-streaming/gradle.lockfile
index d6cc9f7eee4..ecfdb32e82e 100644
--- a/solr/solrj-streaming/gradle.lockfile
+++ b/solr/solrj-streaming/gradle.lockfile
@@ -30,7 +30,7 @@ 
com.google.protobuf:protobuf-java:3.25.8=annotationProcessor,errorprone,jarValid
 com.googlecode.json-simple:json-simple:1.1.1=jarValidation,testRuntimeClasspath
 com.j256.simplemagic:simplemagic:1.17=jarValidation,testRuntimeClasspath
 com.jayway.jsonpath:json-path:2.9.0=jarValidation,testRuntimeClasspath
-com.tdunning:t-digest:3.3=jarValidation,testRuntimeClasspath
+com.tdunning:t-digest:3.3=compileClasspath,jarValidation,runtimeClasspath,testCompileClasspath,testRuntimeClasspath
 commons-cli:commons-cli:1.11.0=jarValidation,testRuntimeClasspath
 commons-codec:commons-codec:1.21.0=jarValidation,testRuntimeClasspath
 
commons-io:commons-io:2.21.0=jarValidation,testCompileClasspath,testRuntimeClasspath
diff --git 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/PercentileMetric.java
 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/PercentileMetric.java
index c4784cd9276..f868315738b 100644
--- 
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/PercentileMetric.java
+++ 
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/metrics/PercentileMetric.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.client.solrj.io.stream.metrics;
 
+import com.tdunning.math.stats.AVLTreeDigest;
 import java.io.IOException;
 import java.util.Locale;
 import org.apache.solr.client.solrj.io.Tuple;
@@ -24,27 +25,27 @@ import 
org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 
 public class PercentileMetric extends Metric {
-  private long longMax = -Long.MIN_VALUE;
-  private double doubleMax = -Double.MAX_VALUE;
-  private String columnName;
 
-  public PercentileMetric(String columnName, int percentile) {
+  private static final int COMPRESSION = 100;
+
+  private String columnName;
+  private double percentile;
+  private AVLTreeDigest digest;
 
+  public PercentileMetric(String columnName, double percentile) {
     init("per", columnName, percentile);
   }
 
   public PercentileMetric(StreamExpression expression, StreamFactory factory) 
throws IOException {
-    // grab all parameters out
     String functionName = expression.getFunctionName();
     String columnName = factory.getValueOperand(expression, 0);
-    int percentile = Integer.parseInt(factory.getValueOperand(expression, 1));
+    String percentileStr = factory.getValueOperand(expression, 1);
 
-    // validate expression contains only what we want.
     if (null == columnName) {
       throw new IOException(
           String.format(
               Locale.ROOT,
-              "Invalid expression %s - expected %s(columnName)",
+              "Invalid expression %s - expected %s(columnName, percentile)",
               expression,
               functionName));
     }
@@ -53,39 +54,64 @@ public class PercentileMetric extends Metric {
           String.format(Locale.ROOT, "Invalid expression %s - unknown operands 
found", expression));
     }
 
+    double percentile = Double.parseDouble(percentileStr);
     init(functionName, columnName, percentile);
   }
 
-  private void init(String functionName, String columnName, int percentile) {
+  private void init(String functionName, String columnName, double percentile) 
{
+    if (percentile < 0 || percentile > 100) {
+      throw new IllegalArgumentException("percentile must be between 0 and 
100, got " + percentile);
+    }
     this.columnName = columnName;
+    this.percentile = percentile;
+    this.digest = new AVLTreeDigest(COMPRESSION);
     setFunctionName(functionName);
-    setIdentifier(functionName, "(", columnName, "," + percentile, ")");
+    setIdentifier(functionName, "(", columnName, "," + 
formatPercentile(percentile), ")");
   }
 
-  @Override
-  public Number getValue() {
-    if (longMax == Long.MIN_VALUE) {
-      return doubleMax;
-    } else {
-      return longMax;
+  private static String formatPercentile(double percentile) {
+    if (percentile == Math.floor(percentile) && 
!Double.isInfinite(percentile)) {
+      return Integer.toString((int) percentile);
     }
+    return String.valueOf(percentile);
   }
 
   @Override
-  public String[] getColumns() {
-    return new String[] {columnName};
+  public void update(Tuple tuple) {
+    Object o = tuple.get(columnName);
+    if (o instanceof Double d) {
+      digest.add(d);
+    } else if (o instanceof Float f) {
+      digest.add(f.doubleValue());
+    } else if (o instanceof Integer i) {
+      digest.add(i.doubleValue());
+    } else if (o instanceof Long l) {
+      digest.add(l.doubleValue());
+    }
   }
 
   @Override
-  public void update(Tuple tuple) {}
+  public Number getValue() {
+    if (digest.size() == 0) {
+      return null;
+    }
+    return digest.quantile(percentile / 100.0);
+  }
 
   @Override
   public Metric newInstance() {
-    return new MaxMetric(columnName);
+    return new PercentileMetric(columnName, percentile);
+  }
+
+  @Override
+  public String[] getColumns() {
+    return new String[] {columnName};
   }
 
   @Override
   public StreamExpressionParameter toExpression(StreamFactory factory) throws 
IOException {
-    return new StreamExpression(getFunctionName()).withParameter(columnName);
+    return new StreamExpression(getFunctionName())
+        .withParameter(columnName)
+        .withParameter(formatPercentile(percentile));
   }
 }
diff --git 
a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
 
b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
index c838af76ca4..2ce5c3d7afd 100644
--- 
a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
+++ 
b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java
@@ -39,6 +39,7 @@ import org.apache.solr.client.solrj.io.comp.FieldComparator;
 import org.apache.solr.client.solrj.io.comp.MultipleFieldComparator;
 import org.apache.solr.client.solrj.io.eq.FieldEqualitor;
 import org.apache.solr.client.solrj.io.ops.GroupOperation;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParameter;
 import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
 import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
 import org.apache.solr.client.solrj.io.stream.metrics.Bucket;
@@ -49,6 +50,7 @@ import 
org.apache.solr.client.solrj.io.stream.metrics.MeanMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.Metric;
 import org.apache.solr.client.solrj.io.stream.metrics.MinMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.MissingMetric;
+import org.apache.solr.client.solrj.io.stream.metrics.PercentileMetric;
 import org.apache.solr.client.solrj.io.stream.metrics.SumMetric;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -1855,6 +1857,114 @@ public class StreamingTest extends SolrCloudTestCase {
     }
   }
 
+  @Test
+  public void testPercentileRollupStream() throws Exception {
+
+    helloDocsUpdateRequest.commit(cluster.getSolrClient(), COLLECTIONORALIAS);
+    StreamContext streamContext = new StreamContext();
+    SolrClientCache solrClientCache = new SolrClientCache();
+    streamContext.setSolrClientCache(solrClientCache);
+
+    try {
+      // Test percentile metrics in RollupStream
+      SolrParams sParamsA = params("q", "*:*", "fl", "a_s,a_i,a_f", "sort", 
"a_s asc");
+      CloudSolrStream stream = new CloudSolrStream(solrConnection, 
COLLECTIONORALIAS, sParamsA);
+
+      Bucket[] buckets = {new Bucket("a_s")};
+
+      Metric[] metrics = {
+        new PercentileMetric("a_i", 50),
+        new PercentileMetric("a_f", 50),
+        new PercentileMetric("a_i", 0),
+        new PercentileMetric("a_i", 100),
+        new PercentileMetric("a_i", 99.9),
+        new MinMetric("a_i"),
+        new MaxMetric("a_i"),
+      };
+
+      RollupStream rollupStream = new RollupStream(stream, buckets, metrics);
+      rollupStream.setStreamContext(streamContext);
+      List<Tuple> tuples = getTuples(rollupStream);
+
+      assertEquals(3, tuples.size());
+
+      // hello0: a_i = [0, 1, 2, 14], a_f = [1, 2, 5, 10]
+      Tuple tuple = tuples.get(0);
+      assertEquals("hello0", tuple.getString("a_s"));
+      Double peri50 = tuple.getDouble("per(a_i,50)");
+      Double perf50 = tuple.getDouble("per(a_f,50)");
+      Double peri0 = tuple.getDouble("per(a_i,0)");
+      Double peri100 = tuple.getDouble("per(a_i,100)");
+      Double peri999 = tuple.getDouble("per(a_i,99.9)");
+      Double mini = tuple.getDouble("min(a_i)");
+      Double maxi = tuple.getDouble("max(a_i)");
+
+      assertNotNull(peri50);
+      assertNotNull(perf50);
+      // 0th percentile should approximate min
+      assertEquals(mini, peri0, 0.5);
+      // 100th percentile should approximate max
+      assertEquals(maxi, peri100, 0.5);
+      // 99.9th percentile should be close to max
+      assertNotNull(peri999);
+
+      // hello3: a_i = [3, 10, 12, 13], a_f = [3, 6, 8, 9]
+      tuple = tuples.get(1);
+      assertEquals("hello3", tuple.getString("a_s"));
+      peri50 = tuple.getDouble("per(a_i,50)");
+      perf50 = tuple.getDouble("per(a_f,50)");
+      peri0 = tuple.getDouble("per(a_i,0)");
+      peri100 = tuple.getDouble("per(a_i,100)");
+      mini = tuple.getDouble("min(a_i)");
+      maxi = tuple.getDouble("max(a_i)");
+
+      assertNotNull(peri50);
+      assertNotNull(perf50);
+      assertEquals(mini, peri0, 0.5);
+      assertEquals(maxi, peri100, 0.5);
+
+      // hello4: a_i = [4, 11], a_f = [4, 7]
+      tuple = tuples.get(2);
+      assertEquals("hello4", tuple.getString("a_s"));
+      peri50 = tuple.getDouble("per(a_i,50)");
+      perf50 = tuple.getDouble("per(a_f,50)");
+      peri0 = tuple.getDouble("per(a_i,0)");
+      peri100 = tuple.getDouble("per(a_i,100)");
+      mini = tuple.getDouble("min(a_i)");
+      maxi = tuple.getDouble("max(a_i)");
+
+      assertNotNull(peri50);
+      assertNotNull(perf50);
+      assertEquals(mini, peri0, 0.5);
+      assertEquals(maxi, peri100, 0.5);
+
+      // Test toExpression round-trip
+      PercentileMetric pm = new PercentileMetric("a_i", 50);
+      StreamExpressionParameter expr = pm.toExpression(streamFactory);
+      assertEquals("per(a_i,50)", expr.toString());
+
+      pm = new PercentileMetric("a_i", 99.9);
+      expr = pm.toExpression(streamFactory);
+      assertEquals("per(a_i,99.9)", expr.toString());
+
+      // Test HashRollupStream with percentile
+      stream = new CloudSolrStream(solrConnection, COLLECTIONORALIAS, 
sParamsA);
+      Metric[] hashMetrics = {new PercentileMetric("a_i", 50), new 
PercentileMetric("a_f", 50)};
+
+      HashRollupStream hashRollupStream = new HashRollupStream(stream, 
buckets, hashMetrics);
+      hashRollupStream.setStreamContext(streamContext);
+      tuples = getTuples(hashRollupStream);
+
+      assertEquals(3, tuples.size());
+      for (Tuple t : tuples) {
+        assertNotNull(t.getDouble("per(a_i,50)"));
+        assertNotNull(t.getDouble("per(a_f,50)"));
+      }
+    } finally {
+      solrClientCache.close();
+    }
+  }
+
   @Test
   public void testDaemonTopicStream() throws Exception {
     Assume.assumeTrue(!useAlias);

Reply via email to