This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d491a108a5 Propagate MSQ distinct early termination flags (#18648)
4d491a108a5 is described below

commit 4d491a108a538589c9172ecb55406f17f5ed0aac
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Jun 2 13:56:02 2026 -0700

    Propagate MSQ distinct early termination flags (#18648)
---
 .../org/apache/pinot/common/datatable/StatMap.java | 73 ++++++++++++++++-
 .../response/broker/BrokerResponseNativeV2.java    | 15 +++-
 .../apache/pinot/common/datatable/StatMapTest.java | 49 +++++++++--
 .../broker/BrokerResponseNativeV2Test.java         | 53 ++++++++++++
 .../tests/custom/DistinctQueriesTest.java          | 28 ++++++-
 .../pinot/query/runtime/operator/LeafOperator.java | 26 +++++-
 .../query/runtime/operator/LeafOperatorTest.java   | 95 ++++++++++++++++++++++
 7 files changed, 323 insertions(+), 16 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
index 9754a0a4dff..0c2cb4184a1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.datatable;
 
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import java.io.DataInput;
@@ -26,8 +27,10 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -149,6 +152,24 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
     return this;
   }
 
+  public Set<String> getStringSet(K key) {
+    Preconditions.checkArgument(key.getType() == Type.STRING_SET, "Key %s is 
of type %s, not STRING_SET", key,
+        key.getType());
+    Object o = _map.get(key);
+    return o == null ? Set.of() : (Set<String>) o;
+  }
+
+  public StatMap<K> merge(K key, Set<String> value) {
+    Set<String> oldValue = getStringSet(key);
+    Set<String> newValue = key.merge(oldValue, value);
+    if (newValue.isEmpty()) {
+      _map.remove(key);
+    } else {
+      _map.put(key, Collections.unmodifiableSet(new 
LinkedHashSet<>(newValue)));
+    }
+    return this;
+  }
+
   /**
    * Returns the value associated with the key.
    * <p>
@@ -164,6 +185,8 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
         return getLong(key);
       case STRING:
         return getString(key);
+      case STRING_SET:
+        return getStringSet(key);
       default:
         throw new IllegalArgumentException("Unsupported type: " + 
key.getType());
     }
@@ -216,6 +239,9 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
         case STRING:
           merge(key, (String) value);
           break;
+        case STRING_SET:
+          merge(key, (Set<String>) value);
+          break;
         default:
           throw new IllegalArgumentException("Unsupported type: " + 
key.getType());
       }
@@ -261,6 +287,14 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
         case STRING:
           merge(key, input.readUTF());
           break;
+        case STRING_SET:
+          int size = input.readInt();
+          LinkedHashSet<String> values = new LinkedHashSet<>(size);
+          for (int j = 0; j < size; j++) {
+            values.add(input.readUTF());
+          }
+          merge(key, values);
+          break;
         default:
           throw new IllegalStateException("Unknown type " + key.getType());
       }
@@ -311,6 +345,18 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
             node.put(key.getStatName(), (String) value);
           }
           break;
+        case STRING_SET:
+          if (value == null) {
+            if (key.includeDefaultInJson()) {
+              node.putArray(key.getStatName());
+            }
+          } else {
+            ArrayNode arrayNode = node.putArray(key.getStatName());
+            for (String stringValue : (Set<String>) value) {
+              arrayNode.add(stringValue);
+            }
+          }
+          break;
         default:
           throw new IllegalArgumentException("Unsupported type: " + 
key.getType());
       }
@@ -366,6 +412,18 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
           }
           break;
         }
+        case STRING_SET: {
+          Set<String> value = getStringSet(key);
+          if (!value.isEmpty()) {
+            writtenKeys++;
+            output.writeByte(ordinal);
+            output.writeInt(value.size());
+            for (String stringValue : value) {
+              output.writeUTF(stringValue);
+            }
+          }
+          break;
+        }
         default:
           throw new IllegalStateException("Unknown type " + key.getType());
       }
@@ -398,6 +456,12 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
             throw new IllegalStateException("String value must be non-null but 
null is stored for key " + key);
           }
           break;
+        case STRING_SET:
+          if (value == null || ((Set<String>) value).isEmpty()) {
+            throw new IllegalStateException("String set value must be 
non-empty but " + value + " is stored for key "
+                + key);
+          }
+          break;
         default:
           throw new IllegalArgumentException("Unsupported type: " + 
key.getType());
       }
@@ -493,6 +557,12 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
       return value2 != null ? value2 : value1;
     }
 
+    default Set<String> merge(Set<String> value1, Set<String> value2) {
+      LinkedHashSet<String> merged = new LinkedHashSet<>(value1);
+      merged.addAll(value2);
+      return merged;
+    }
+
     /**
      * The type of the values associated to this key.
      */
@@ -540,6 +610,7 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
     BOOLEAN,
     INT,
     LONG,
-    STRING
+    STRING,
+    STRING_SET
   }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 69ae90b675a..b14e8bd6fea 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -38,8 +38,8 @@ import org.apache.pinot.common.response.ProcessingException;
  */
 @JsonPropertyOrder({
     "resultTable", "numRowsResultSet", "partialResult", "exceptions", 
"numGroupsLimitReached",
-    "numGroupsWarningLimitReached", "numGroups", "maxRowsInJoinReached", 
"maxRowsInJoin",
-    "maxRowsInWindowReached", "maxRowsInWindow", "timeUsedMs", "stageStats",
+    "numGroupsWarningLimitReached", "numGroups", "earlyTerminationReasons", 
"maxRowsInJoinReached",
+    "maxRowsInJoin", "maxRowsInWindowReached", "maxRowsInWindow", 
"timeUsedMs", "stageStats",
     "maxRowsInOperator", "requestId", "clientRequestId", "brokerId", 
"numDocsScanned", "totalDocs",
     "numEntriesScannedInFilter", "numEntriesScannedPostFilter", 
"numServersQueried", "numServersResponded",
     "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", 
"numConsumingSegmentsQueried",
@@ -112,7 +112,8 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
   @JsonProperty(access = JsonProperty.Access.READ_ONLY)
   @Override
   public boolean isPartialResult() {
-    return getExceptionsSize() > 0 || isNumGroupsLimitReached() || 
isMaxRowsInJoinReached();
+    return getExceptionsSize() > 0 || isNumGroupsLimitReached() || 
!getEarlyTerminationReasons().isEmpty()
+        || isMaxRowsInJoinReached();
   }
 
   @Override
@@ -163,6 +164,11 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
     _brokerStats.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED, 
numGroupsWarningLimitReached);
   }
 
+  @JsonInclude(JsonInclude.Include.NON_EMPTY)
+  public List<String> getEarlyTerminationReasons() {
+    return 
List.copyOf(_brokerStats.getStringSet(StatKey.EARLY_TERMINATION_REASONS));
+  }
+
   @Override
   public boolean isMaxRowsInJoinReached() {
     return _maxRowsInJoinReached;
@@ -487,7 +493,8 @@ public class BrokerResponseNativeV2 implements 
BrokerResponse {
       public long merge(long value1, long value2) {
         return Math.max(value1, value2);
       }
-    };
+    },
+    EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java 
b/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java
index b6b19a2b666..b8ec1831f79 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java
@@ -22,6 +22,9 @@ import com.google.common.io.ByteArrayDataInput;
 import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
 import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
 import org.testng.Assert;
 import org.testng.SkipException;
@@ -70,6 +73,15 @@ public class StatMapTest {
     statMap.merge(stat, "foo");
   }
 
+  @Test(dataProvider = "allTypeStats", expectedExceptions = 
IllegalArgumentException.class)
+  public void dynamicTypeCheckPutStringSet(MyStats stat) {
+    if (stat.getType() == StatMap.Type.STRING_SET) {
+      throw new SkipException("Skipping STRING_SET test");
+    }
+    StatMap<MyStats> statMap = new StatMap<>(MyStats.class);
+    statMap.merge(stat, Set.of("foo"));
+  }
+
   @Test(dataProvider = "allTypeStats")
   public void singleEncodeDecode(MyStats stat)
       throws IOException {
@@ -87,6 +99,9 @@ public class StatMapTest {
       case STRING:
         statMap.merge(stat, "foo");
         break;
+      case STRING_SET:
+        statMap.merge(stat, Set.of("foo"));
+        break;
       default:
         throw new IllegalStateException();
     }
@@ -111,6 +126,9 @@ public class StatMapTest {
         case STRING:
           statMap.merge(stat, "foo");
           break;
+        case STRING_SET:
+          statMap.merge(stat, Set.of("foo"));
+          break;
         default:
           throw new IllegalStateException();
       }
@@ -118,6 +136,18 @@ public class StatMapTest {
     testSerializeDeserialize(statMap);
   }
 
+  @Test
+  public void stringSetPreservesOrderAndDeduplicates() {
+    StatMap<MyStats> statMap = new StatMap<>(MyStats.class)
+        .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar"))
+        .merge(MyStats.STR_SET_KEY, stringSet("foo", "baz"));
+
+    
Assert.assertEquals(List.copyOf(statMap.getStringSet(MyStats.STR_SET_KEY)), 
List.of("foo", "bar", "baz"));
+    Assert.assertEquals(statMap.asJson().get("strSetKey").get(0).asText(), 
"foo");
+    Assert.assertEquals(statMap.asJson().get("strSetKey").get(1).asText(), 
"bar");
+    Assert.assertEquals(statMap.asJson().get("strSetKey").get(2).asText(), 
"baz");
+  }
+
   private <K extends Enum<K> & StatMap.Key> void 
testSerializeDeserialize(StatMap<K> statMap)
       throws IOException {
     ByteArrayDataOutput output = ByteStreams.newDataOutput();
@@ -153,22 +183,26 @@ public class StatMapTest {
         .merge(MyStats.BOOL_KEY, true)
         .merge(MyStats.LONG_KEY, 1L)
         .merge(MyStats.INT_KEY, 1)
-        .merge(MyStats.STR_KEY, "foo"),
+        .merge(MyStats.STR_KEY, "foo")
+        .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")),
       new StatMap<>(MyStats.class)
         .merge(MyStats.BOOL_KEY, false)
         .merge(MyStats.LONG_KEY, 1L)
         .merge(MyStats.INT_KEY, 1)
-        .merge(MyStats.STR_KEY, "foo"),
+        .merge(MyStats.STR_KEY, "foo")
+        .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")),
       new StatMap<>(MyStats.class)
         .merge(MyStats.BOOL_KEY, true)
         .merge(MyStats.LONG_KEY, 0L)
         .merge(MyStats.INT_KEY, 1)
-        .merge(MyStats.STR_KEY, "foo"),
+        .merge(MyStats.STR_KEY, "foo")
+        .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")),
       new StatMap<>(MyStats.class)
         .merge(MyStats.BOOL_KEY, false)
         .merge(MyStats.LONG_KEY, 1L)
         .merge(MyStats.INT_KEY, 0)
-        .merge(MyStats.STR_KEY, "foo"),
+        .merge(MyStats.STR_KEY, "foo")
+        .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")),
       new StatMap<>(MyStats.class)
         .merge(MyStats.BOOL_KEY, false)
         .merge(MyStats.LONG_KEY, 1L)
@@ -188,11 +222,16 @@ public class StatMapTest {
     return MyStats.values();
   }
 
+  private static Set<String> stringSet(String... values) {
+    return new LinkedHashSet<>(List.of(values));
+  }
+
   public enum MyStats implements StatMap.Key {
     BOOL_KEY(StatMap.Type.BOOLEAN),
     LONG_KEY(StatMap.Type.LONG),
     INT_KEY(StatMap.Type.INT),
-    STR_KEY(StatMap.Type.STRING);
+    STR_KEY(StatMap.Type.STRING),
+    STR_SET_KEY(StatMap.Type.STRING_SET);
 
     private final StatMap.Type _type;
 
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2Test.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2Test.java
new file mode 100644
index 00000000000..320db22557a
--- /dev/null
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2Test.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.common.datatable.StatMap;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class BrokerResponseNativeV2Test {
+  @Test
+  public void testEarlyTerminationReasonsMarkPartialResponse() {
+    BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+    assertTrue(brokerResponse.getEarlyTerminationReasons().isEmpty());
+    assertFalse(brokerResponse.isPartialResult());
+
+    brokerResponse.addBrokerStats(new 
StatMap<>(BrokerResponseNativeV2.StatKey.class)
+        .merge(BrokerResponseNativeV2.StatKey.EARLY_TERMINATION_REASONS,
+            stringSet("DISTINCT_MAX_ROWS", "DISTINCT_MAX_ROWS_WITHOUT_CHANGE"))
+        .merge(BrokerResponseNativeV2.StatKey.EARLY_TERMINATION_REASONS,
+            stringSet("DISTINCT_MAX_ROWS", "DISTINCT_MAX_EXECUTION_TIME")));
+
+    assertEquals(brokerResponse.getEarlyTerminationReasons(),
+        List.of("DISTINCT_MAX_ROWS", "DISTINCT_MAX_ROWS_WITHOUT_CHANGE", 
"DISTINCT_MAX_EXECUTION_TIME"));
+    assertTrue(brokerResponse.isPartialResult());
+  }
+
+  private static Set<String> stringSet(String... values) {
+    return new LinkedHashSet<>(List.of(values));
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java
index 808a966d117..2d521a775e0 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java
@@ -33,13 +33,15 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 
-/**
- * Integration test for distinct early termination query options. Uses SSE 
(v1) only because these options
- * are enforced in DistinctCombineOperator which is not used by the 
multi-stage query engine.
- */
+/// Integration test for distinct early termination query options.
+///
+/// SSE responses expose reason-specific boolean fields. MSQ responses expose 
the aggregated early termination reasons
+/// and mark the response as partial.
 @Test(suiteName = "CustomClusterIntegrationTest")
 public class DistinctQueriesTest extends CustomDataQueryClusterIntegrationTest 
{
   private static final String TABLE_NAME = "DistinctQueriesCustomTest";
@@ -138,6 +140,24 @@ public class DistinctQueriesTest extends 
CustomDataQueryClusterIntegrationTest {
         "partialResult should be true. Response: " + response);
   }
 
+  /// Tests `maxRowsInDistinct` with MSQ: the leaf SSE execution sets early 
termination metadata and the multi-stage
+  /// response marks itself partial with the aggregated reason.
+  @Test
+  public void testMultiStageMaxRowsInDistinctEarlyTermination()
+      throws Exception {
+    setUseMultiStageQueryEngine(true);
+    String sql = String.format("SELECT DISTINCT %s FROM %s LIMIT 10000", 
STRING_COL, getTableName());
+    JsonNode response = postQueryWithOptions(sql, "maxRowsInDistinct=1");
+
+    assertTrue(response.path("exceptions").isEmpty(), "expected no exceptions. 
Response: " + response);
+    assertTrue(response.path("partialResult").asBoolean(false),
+        "partialResult should be true. Response: " + response);
+    assertEquals(response.path("earlyTerminationReasons").path(0).asText(), 
"DISTINCT_MAX_ROWS",
+        "expected distinct early termination reason. Response: " + response);
+    assertFalse(response.has("maxRowsInDistinctReached"),
+        "MSQ response should expose earlyTerminationReasons instead of legacy 
V1 flags. Response: " + response);
+  }
+
   /**
    * Tests maxRowsWithoutChangeInDistinct: when merging a segment adds no new 
distinct values, the
    * segment's numDocsScanned counts toward the no-change budget. With 2 
identical segments, the
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 5a650941d82..cb4a8c3b424 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
@@ -44,6 +45,7 @@ import 
org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import 
org.apache.pinot.core.operator.blocks.results.BaseResultsBlock.EarlyTerminationReason;
 import org.apache.pinot.core.operator.blocks.results.ExplainV2ResultBlock;
 import org.apache.pinot.core.operator.blocks.results.MetadataResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
@@ -414,8 +416,10 @@ public class LeafOperator extends MultiStageOperator {
         case NUM_CONSUMING_SEGMENTS_MATCHED:
           _statMap.merge(StatKey.NUM_CONSUMING_SEGMENTS_MATCHED, 
Integer.parseInt(entry.getValue()));
           break;
-        case SORTED:
         case EARLY_TERMINATION_REASON:
+          mergeEarlyTerminationReason(entry.getValue());
+          break;
+        case SORTED:
           break;
         default:
           throw new IllegalArgumentException("Unhandled leaf execution stat: " 
+ key);
@@ -423,6 +427,20 @@ public class LeafOperator extends MultiStageOperator {
     }
   }
 
+  private void mergeEarlyTerminationReason(@Nullable String 
earlyTerminationReason) {
+    if (earlyTerminationReason == null || earlyTerminationReason.isEmpty()) {
+      return;
+    }
+    try {
+      EarlyTerminationReason reason = 
EarlyTerminationReason.valueOf(earlyTerminationReason);
+      if (reason != EarlyTerminationReason.NONE) {
+        _statMap.merge(StatKey.EARLY_TERMINATION_REASONS, 
Set.of(reason.name()));
+      }
+    } catch (IllegalArgumentException e) {
+      LOGGER.debug("Skipping unknown early termination reason: {}", 
earlyTerminationReason);
+    }
+  }
+
   private ExplainedNode asNode(ExplainInfo info) {
     int size = info.getInputs().size();
     List<PlanNode> inputs = new ArrayList<>(size);
@@ -749,7 +767,8 @@ public class LeafOperator extends MultiStageOperator {
     /**
      * Time spent in single-stage execution engine for this leaf stage.
      */
-    SSE_EXECUTION_TIME_MS(StatMap.Type.LONG, null);
+    SSE_EXECUTION_TIME_MS(StatMap.Type.LONG, null),
+    EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET);
     // IMPORTANT: When adding new StatKeys, make sure to either create the 
same key in BrokerResponseNativeV2.StatKey or
     //  call the constructor that accepts a String as last argument and set it 
to null.
     //  Otherwise the constructor will fail with an IllegalArgumentException 
which will not be caught and will
@@ -798,6 +817,9 @@ public class LeafOperator extends MultiStageOperator {
           case STRING:
             oldMetadata.merge(_brokerKey, stats.getString(this));
             break;
+          case STRING_SET:
+            oldMetadata.merge(_brokerKey, stats.getStringSet(this));
+            break;
           default:
             throw new IllegalStateException("Unsupported type: " + _type);
         }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
index e69ffac27cc..685055773a2 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,12 +30,17 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.Table;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import 
org.apache.pinot.core.operator.blocks.results.BaseResultsBlock.EarlyTerminationReason;
 import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.MetadataResultsBlock;
@@ -50,6 +56,7 @@ import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.AfterClass;
@@ -63,6 +70,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 
@@ -408,6 +416,93 @@ public class LeafOperatorTest {
     operator.close();
   }
 
+  @Test
+  public void shouldPropagateDistinctEarlyTerminationReason() {
+    // Given:
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT DISTINCT intCol FROM tbl");
+    DataSchema schema = new DataSchema(new String[]{"intCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+    InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new 
MetadataResultsBlock());
+    
metadataBlock.getResponseMetadata().put(DataTable.MetadataKey.EARLY_TERMINATION_REASON.getName(),
+        EarlyTerminationReason.DISTINCT_MAX_ROWS.name());
+    QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(), 
metadataBlock);
+    LeafOperator operator =
+        new LeafOperator(OperatorTestUtil.getTracingContext(), 
mockQueryRequests(1), schema, queryExecutor,
+            _executorService);
+    _operatorRef.set(operator);
+
+    // When:
+    assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the 
metadata block");
+
+    // Then:
+    StatMap<LeafOperator.StatKey> leafStats = operator.copyStatMaps();
+    
assertEquals(List.copyOf(leafStats.getStringSet(LeafOperator.StatKey.EARLY_TERMINATION_REASONS)),
+        List.of(EarlyTerminationReason.DISTINCT_MAX_ROWS.name()));
+
+    BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+    MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats);
+    assertEquals(brokerResponse.getEarlyTerminationReasons(), 
List.of(EarlyTerminationReason.DISTINCT_MAX_ROWS.name()));
+    assertTrue(brokerResponse.isPartialResult());
+    JsonNode responseJson = JsonUtils.objectToJsonNode(brokerResponse);
+    assertEquals(responseJson.path("earlyTerminationReasons").path(0).asText(),
+        EarlyTerminationReason.DISTINCT_MAX_ROWS.name());
+    assertFalse(responseJson.has("maxRowsInDistinctReached"));
+    assertFalse(responseJson.has("maxRowsWithoutChangeInDistinctReached"));
+    assertFalse(responseJson.has("maxExecutionTimeInDistinctReached"));
+    assertTrue(responseJson.path("partialResult").asBoolean(false));
+
+    operator.close();
+  }
+
+  @Test
+  public void shouldSkipNoneEarlyTerminationReason() {
+    assertSkippedEarlyTerminationReason(EarlyTerminationReason.NONE.name());
+  }
+
+  @Test
+  public void shouldSkipUnknownEarlyTerminationReason() {
+    assertSkippedEarlyTerminationReason("UNKNOWN_REASON");
+  }
+
+  @Test
+  public void shouldSkipEmptyEarlyTerminationReason() {
+    assertSkippedEarlyTerminationReason("");
+  }
+
+  @Test
+  public void shouldSkipNullEarlyTerminationReason() {
+    assertSkippedEarlyTerminationReason(null);
+  }
+
+  private void assertSkippedEarlyTerminationReason(@Nullable String 
earlyTerminationReason) {
+    // Given:
+    QueryContext queryContext = 
QueryContextConverterUtils.getQueryContext("SELECT DISTINCT intCol FROM tbl");
+    DataSchema schema = new DataSchema(new String[]{"intCol"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+    InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new 
MetadataResultsBlock());
+    
metadataBlock.getResponseMetadata().put(DataTable.MetadataKey.EARLY_TERMINATION_REASON.getName(),
+        earlyTerminationReason);
+    QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(), 
metadataBlock);
+    LeafOperator operator =
+        new LeafOperator(OperatorTestUtil.getTracingContext(), 
mockQueryRequests(1), schema, queryExecutor,
+            _executorService);
+    _operatorRef.set(operator);
+
+    // When:
+    assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the 
metadata block");
+
+    // Then:
+    StatMap<LeafOperator.StatKey> leafStats = operator.copyStatMaps();
+    
assertTrue(leafStats.getStringSet(LeafOperator.StatKey.EARLY_TERMINATION_REASONS).isEmpty());
+
+    BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+    MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats);
+    assertTrue(brokerResponse.getEarlyTerminationReasons().isEmpty());
+    assertFalse(brokerResponse.isPartialResult());
+
+    operator.close();
+  }
+
   @Test
   public void shouldReturnAggregationResultBlock() {
     // Given:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to