This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4d491a108a5 Propagate MSQ distinct early termination flags (#18648)
4d491a108a5 is described below
commit 4d491a108a538589c9172ecb55406f17f5ed0aac
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Jun 2 13:56:02 2026 -0700
Propagate MSQ distinct early termination flags (#18648)
---
.../org/apache/pinot/common/datatable/StatMap.java | 73 ++++++++++++++++-
.../response/broker/BrokerResponseNativeV2.java | 15 +++-
.../apache/pinot/common/datatable/StatMapTest.java | 49 +++++++++--
.../broker/BrokerResponseNativeV2Test.java | 53 ++++++++++++
.../tests/custom/DistinctQueriesTest.java | 28 ++++++-
.../pinot/query/runtime/operator/LeafOperator.java | 26 +++++-
.../query/runtime/operator/LeafOperatorTest.java | 95 ++++++++++++++++++++++
7 files changed, 323 insertions(+), 16 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
index 9754a0a4dff..0c2cb4184a1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/datatable/StatMap.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.datatable;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import java.io.DataInput;
@@ -26,8 +27,10 @@ import java.io.IOException;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -149,6 +152,24 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
return this;
}
+ public Set<String> getStringSet(K key) {
+ Preconditions.checkArgument(key.getType() == Type.STRING_SET, "Key %s is
of type %s, not STRING_SET", key,
+ key.getType());
+ Object o = _map.get(key);
+ return o == null ? Set.of() : (Set<String>) o;
+ }
+
+ public StatMap<K> merge(K key, Set<String> value) {
+ Set<String> oldValue = getStringSet(key);
+ Set<String> newValue = key.merge(oldValue, value);
+ if (newValue.isEmpty()) {
+ _map.remove(key);
+ } else {
+ _map.put(key, Collections.unmodifiableSet(new
LinkedHashSet<>(newValue)));
+ }
+ return this;
+ }
+
/**
* Returns the value associated with the key.
* <p>
@@ -164,6 +185,8 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
return getLong(key);
case STRING:
return getString(key);
+ case STRING_SET:
+ return getStringSet(key);
default:
throw new IllegalArgumentException("Unsupported type: " +
key.getType());
}
@@ -216,6 +239,9 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
case STRING:
merge(key, (String) value);
break;
+ case STRING_SET:
+ merge(key, (Set<String>) value);
+ break;
default:
throw new IllegalArgumentException("Unsupported type: " +
key.getType());
}
@@ -261,6 +287,14 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
case STRING:
merge(key, input.readUTF());
break;
+ case STRING_SET:
+ int size = input.readInt();
+ LinkedHashSet<String> values = new LinkedHashSet<>(size);
+ for (int j = 0; j < size; j++) {
+ values.add(input.readUTF());
+ }
+ merge(key, values);
+ break;
default:
throw new IllegalStateException("Unknown type " + key.getType());
}
@@ -311,6 +345,18 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
node.put(key.getStatName(), (String) value);
}
break;
+ case STRING_SET:
+ if (value == null) {
+ if (key.includeDefaultInJson()) {
+ node.putArray(key.getStatName());
+ }
+ } else {
+ ArrayNode arrayNode = node.putArray(key.getStatName());
+ for (String stringValue : (Set<String>) value) {
+ arrayNode.add(stringValue);
+ }
+ }
+ break;
default:
throw new IllegalArgumentException("Unsupported type: " +
key.getType());
}
@@ -366,6 +412,18 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
}
break;
}
+ case STRING_SET: {
+ Set<String> value = getStringSet(key);
+ if (!value.isEmpty()) {
+ writtenKeys++;
+ output.writeByte(ordinal);
+ output.writeInt(value.size());
+ for (String stringValue : value) {
+ output.writeUTF(stringValue);
+ }
+ }
+ break;
+ }
default:
throw new IllegalStateException("Unknown type " + key.getType());
}
@@ -398,6 +456,12 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
throw new IllegalStateException("String value must be non-null but
null is stored for key " + key);
}
break;
+ case STRING_SET:
+ if (value == null || ((Set<String>) value).isEmpty()) {
+ throw new IllegalStateException("String set value must be
non-empty but " + value + " is stored for key "
+ + key);
+ }
+ break;
default:
throw new IllegalArgumentException("Unsupported type: " +
key.getType());
}
@@ -493,6 +557,12 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
return value2 != null ? value2 : value1;
}
+ default Set<String> merge(Set<String> value1, Set<String> value2) {
+ LinkedHashSet<String> merged = new LinkedHashSet<>(value1);
+ merged.addAll(value2);
+ return merged;
+ }
+
/**
* The type of the values associated to this key.
*/
@@ -540,6 +610,7 @@ public class StatMap<K extends Enum<K> & StatMap.Key> {
BOOLEAN,
INT,
LONG,
- STRING
+ STRING,
+ STRING_SET
}
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index 69ae90b675a..b14e8bd6fea 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -38,8 +38,8 @@ import org.apache.pinot.common.response.ProcessingException;
*/
@JsonPropertyOrder({
"resultTable", "numRowsResultSet", "partialResult", "exceptions",
"numGroupsLimitReached",
- "numGroupsWarningLimitReached", "numGroups", "maxRowsInJoinReached",
"maxRowsInJoin",
- "maxRowsInWindowReached", "maxRowsInWindow", "timeUsedMs", "stageStats",
+ "numGroupsWarningLimitReached", "numGroups", "earlyTerminationReasons",
"maxRowsInJoinReached",
+ "maxRowsInJoin", "maxRowsInWindowReached", "maxRowsInWindow",
"timeUsedMs", "stageStats",
"maxRowsInOperator", "requestId", "clientRequestId", "brokerId",
"numDocsScanned", "totalDocs",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter",
"numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched",
"numConsumingSegmentsQueried",
@@ -112,7 +112,8 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
@JsonProperty(access = JsonProperty.Access.READ_ONLY)
@Override
public boolean isPartialResult() {
- return getExceptionsSize() > 0 || isNumGroupsLimitReached() ||
isMaxRowsInJoinReached();
+ return getExceptionsSize() > 0 || isNumGroupsLimitReached() ||
!getEarlyTerminationReasons().isEmpty()
+ || isMaxRowsInJoinReached();
}
@Override
@@ -163,6 +164,11 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
_brokerStats.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED,
numGroupsWarningLimitReached);
}
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ public List<String> getEarlyTerminationReasons() {
+ return
List.copyOf(_brokerStats.getStringSet(StatKey.EARLY_TERMINATION_REASONS));
+ }
+
@Override
public boolean isMaxRowsInJoinReached() {
return _maxRowsInJoinReached;
@@ -487,7 +493,8 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
public long merge(long value1, long value2) {
return Math.max(value1, value2);
}
- };
+ },
+ EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET);
private final StatMap.Type _type;
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java
index b6b19a2b666..b8ec1831f79 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/datatable/StatMapTest.java
@@ -22,6 +22,9 @@ import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import java.io.IOException;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.testng.Assert;
import org.testng.SkipException;
@@ -70,6 +73,15 @@ public class StatMapTest {
statMap.merge(stat, "foo");
}
+ @Test(dataProvider = "allTypeStats", expectedExceptions =
IllegalArgumentException.class)
+ public void dynamicTypeCheckPutStringSet(MyStats stat) {
+ if (stat.getType() == StatMap.Type.STRING_SET) {
+ throw new SkipException("Skipping STRING_SET test");
+ }
+ StatMap<MyStats> statMap = new StatMap<>(MyStats.class);
+ statMap.merge(stat, Set.of("foo"));
+ }
+
@Test(dataProvider = "allTypeStats")
public void singleEncodeDecode(MyStats stat)
throws IOException {
@@ -87,6 +99,9 @@ public class StatMapTest {
case STRING:
statMap.merge(stat, "foo");
break;
+ case STRING_SET:
+ statMap.merge(stat, Set.of("foo"));
+ break;
default:
throw new IllegalStateException();
}
@@ -111,6 +126,9 @@ public class StatMapTest {
case STRING:
statMap.merge(stat, "foo");
break;
+ case STRING_SET:
+ statMap.merge(stat, Set.of("foo"));
+ break;
default:
throw new IllegalStateException();
}
@@ -118,6 +136,18 @@ public class StatMapTest {
testSerializeDeserialize(statMap);
}
+ @Test
+ public void stringSetPreservesOrderAndDeduplicates() {
+ StatMap<MyStats> statMap = new StatMap<>(MyStats.class)
+ .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar"))
+ .merge(MyStats.STR_SET_KEY, stringSet("foo", "baz"));
+
+
Assert.assertEquals(List.copyOf(statMap.getStringSet(MyStats.STR_SET_KEY)),
List.of("foo", "bar", "baz"));
+ Assert.assertEquals(statMap.asJson().get("strSetKey").get(0).asText(),
"foo");
+ Assert.assertEquals(statMap.asJson().get("strSetKey").get(1).asText(),
"bar");
+ Assert.assertEquals(statMap.asJson().get("strSetKey").get(2).asText(),
"baz");
+ }
+
private <K extends Enum<K> & StatMap.Key> void
testSerializeDeserialize(StatMap<K> statMap)
throws IOException {
ByteArrayDataOutput output = ByteStreams.newDataOutput();
@@ -153,22 +183,26 @@ public class StatMapTest {
.merge(MyStats.BOOL_KEY, true)
.merge(MyStats.LONG_KEY, 1L)
.merge(MyStats.INT_KEY, 1)
- .merge(MyStats.STR_KEY, "foo"),
+ .merge(MyStats.STR_KEY, "foo")
+ .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")),
new StatMap<>(MyStats.class)
.merge(MyStats.BOOL_KEY, false)
.merge(MyStats.LONG_KEY, 1L)
.merge(MyStats.INT_KEY, 1)
- .merge(MyStats.STR_KEY, "foo"),
+ .merge(MyStats.STR_KEY, "foo")
+ .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")),
new StatMap<>(MyStats.class)
.merge(MyStats.BOOL_KEY, true)
.merge(MyStats.LONG_KEY, 0L)
.merge(MyStats.INT_KEY, 1)
- .merge(MyStats.STR_KEY, "foo"),
+ .merge(MyStats.STR_KEY, "foo")
+ .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")),
new StatMap<>(MyStats.class)
.merge(MyStats.BOOL_KEY, false)
.merge(MyStats.LONG_KEY, 1L)
.merge(MyStats.INT_KEY, 0)
- .merge(MyStats.STR_KEY, "foo"),
+ .merge(MyStats.STR_KEY, "foo")
+ .merge(MyStats.STR_SET_KEY, stringSet("foo", "bar")),
new StatMap<>(MyStats.class)
.merge(MyStats.BOOL_KEY, false)
.merge(MyStats.LONG_KEY, 1L)
@@ -188,11 +222,16 @@ public class StatMapTest {
return MyStats.values();
}
+ private static Set<String> stringSet(String... values) {
+ return new LinkedHashSet<>(List.of(values));
+ }
+
public enum MyStats implements StatMap.Key {
BOOL_KEY(StatMap.Type.BOOLEAN),
LONG_KEY(StatMap.Type.LONG),
INT_KEY(StatMap.Type.INT),
- STR_KEY(StatMap.Type.STRING);
+ STR_KEY(StatMap.Type.STRING),
+ STR_SET_KEY(StatMap.Type.STRING_SET);
private final StatMap.Type _type;
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2Test.java
b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2Test.java
new file mode 100644
index 00000000000..320db22557a
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2Test.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.common.datatable.StatMap;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class BrokerResponseNativeV2Test {
+ @Test
+ public void testEarlyTerminationReasonsMarkPartialResponse() {
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ assertTrue(brokerResponse.getEarlyTerminationReasons().isEmpty());
+ assertFalse(brokerResponse.isPartialResult());
+
+ brokerResponse.addBrokerStats(new
StatMap<>(BrokerResponseNativeV2.StatKey.class)
+ .merge(BrokerResponseNativeV2.StatKey.EARLY_TERMINATION_REASONS,
+ stringSet("DISTINCT_MAX_ROWS", "DISTINCT_MAX_ROWS_WITHOUT_CHANGE"))
+ .merge(BrokerResponseNativeV2.StatKey.EARLY_TERMINATION_REASONS,
+ stringSet("DISTINCT_MAX_ROWS", "DISTINCT_MAX_EXECUTION_TIME")));
+
+ assertEquals(brokerResponse.getEarlyTerminationReasons(),
+ List.of("DISTINCT_MAX_ROWS", "DISTINCT_MAX_ROWS_WITHOUT_CHANGE",
"DISTINCT_MAX_EXECUTION_TIME"));
+ assertTrue(brokerResponse.isPartialResult());
+ }
+
+ private static Set<String> stringSet(String... values) {
+ return new LinkedHashSet<>(List.of(values));
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java
index 808a966d117..2d521a775e0 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/DistinctQueriesTest.java
@@ -33,13 +33,15 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-/**
- * Integration test for distinct early termination query options. Uses SSE
(v1) only because these options
- * are enforced in DistinctCombineOperator which is not used by the
multi-stage query engine.
- */
+/// Integration test for distinct early termination query options.
+///
+/// SSE responses expose reason-specific boolean fields. MSQ responses expose
the aggregated early termination reasons
+/// and mark the response as partial.
@Test(suiteName = "CustomClusterIntegrationTest")
public class DistinctQueriesTest extends CustomDataQueryClusterIntegrationTest
{
private static final String TABLE_NAME = "DistinctQueriesCustomTest";
@@ -138,6 +140,24 @@ public class DistinctQueriesTest extends
CustomDataQueryClusterIntegrationTest {
"partialResult should be true. Response: " + response);
}
+ /// Tests `maxRowsInDistinct` with MSQ: the leaf SSE execution sets early
termination metadata and the multi-stage
+ /// response marks itself partial with the aggregated reason.
+ @Test
+ public void testMultiStageMaxRowsInDistinctEarlyTermination()
+ throws Exception {
+ setUseMultiStageQueryEngine(true);
+ String sql = String.format("SELECT DISTINCT %s FROM %s LIMIT 10000",
STRING_COL, getTableName());
+ JsonNode response = postQueryWithOptions(sql, "maxRowsInDistinct=1");
+
+ assertTrue(response.path("exceptions").isEmpty(), "expected no exceptions.
Response: " + response);
+ assertTrue(response.path("partialResult").asBoolean(false),
+ "partialResult should be true. Response: " + response);
+ assertEquals(response.path("earlyTerminationReasons").path(0).asText(),
"DISTINCT_MAX_ROWS",
+ "expected distinct early termination reason. Response: " + response);
+ assertFalse(response.has("maxRowsInDistinctReached"),
+ "MSQ response should expose earlyTerminationReasons instead of legacy
V1 flags. Response: " + response);
+ }
+
/**
* Tests maxRowsWithoutChangeInDistinct: when merging a segment adds no new
distinct values, the
* segment's numDocsScanned counts toward the no-change budget. With 2
identical segments, the
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 5a650941d82..cb4a8c3b424 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@@ -44,6 +45,7 @@ import
org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import
org.apache.pinot.core.operator.blocks.results.BaseResultsBlock.EarlyTerminationReason;
import org.apache.pinot.core.operator.blocks.results.ExplainV2ResultBlock;
import org.apache.pinot.core.operator.blocks.results.MetadataResultsBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
@@ -414,8 +416,10 @@ public class LeafOperator extends MultiStageOperator {
case NUM_CONSUMING_SEGMENTS_MATCHED:
_statMap.merge(StatKey.NUM_CONSUMING_SEGMENTS_MATCHED,
Integer.parseInt(entry.getValue()));
break;
- case SORTED:
case EARLY_TERMINATION_REASON:
+ mergeEarlyTerminationReason(entry.getValue());
+ break;
+ case SORTED:
break;
default:
throw new IllegalArgumentException("Unhandled leaf execution stat: "
+ key);
@@ -423,6 +427,20 @@ public class LeafOperator extends MultiStageOperator {
}
}
+ private void mergeEarlyTerminationReason(@Nullable String
earlyTerminationReason) {
+ if (earlyTerminationReason == null || earlyTerminationReason.isEmpty()) {
+ return;
+ }
+ try {
+ EarlyTerminationReason reason =
EarlyTerminationReason.valueOf(earlyTerminationReason);
+ if (reason != EarlyTerminationReason.NONE) {
+ _statMap.merge(StatKey.EARLY_TERMINATION_REASONS,
Set.of(reason.name()));
+ }
+ } catch (IllegalArgumentException e) {
+ LOGGER.debug("Skipping unknown early termination reason: {}",
earlyTerminationReason);
+ }
+ }
+
private ExplainedNode asNode(ExplainInfo info) {
int size = info.getInputs().size();
List<PlanNode> inputs = new ArrayList<>(size);
@@ -749,7 +767,8 @@ public class LeafOperator extends MultiStageOperator {
/**
* Time spent in single-stage execution engine for this leaf stage.
*/
- SSE_EXECUTION_TIME_MS(StatMap.Type.LONG, null);
+ SSE_EXECUTION_TIME_MS(StatMap.Type.LONG, null),
+ EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET);
// IMPORTANT: When adding new StatKeys, make sure to either create the
same key in BrokerResponseNativeV2.StatKey or
// call the constructor that accepts a String as last argument and set it
to null.
// Otherwise the constructor will fail with an IllegalArgumentException
which will not be caught and will
@@ -798,6 +817,9 @@ public class LeafOperator extends MultiStageOperator {
case STRING:
oldMetadata.merge(_brokerKey, stats.getString(this));
break;
+ case STRING_SET:
+ oldMetadata.merge(_brokerKey, stats.getStringSet(this));
+ break;
default:
throw new IllegalStateException("Unsupported type: " + _type);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
index e69ffac27cc..685055773a2 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.runtime.operator;
+import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -29,12 +30,17 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.Table;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
+import
org.apache.pinot.core.operator.blocks.results.BaseResultsBlock.EarlyTerminationReason;
import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock;
import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.operator.blocks.results.MetadataResultsBlock;
@@ -50,6 +56,7 @@ import org.apache.pinot.query.runtime.blocks.MseBlock;
import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterClass;
@@ -63,6 +70,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
@@ -408,6 +416,93 @@ public class LeafOperatorTest {
operator.close();
}
+ @Test
+ public void shouldPropagateDistinctEarlyTerminationReason() {
+ // Given:
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT DISTINCT intCol FROM tbl");
+ DataSchema schema = new DataSchema(new String[]{"intCol"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+ InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new
MetadataResultsBlock());
+
metadataBlock.getResponseMetadata().put(DataTable.MetadataKey.EARLY_TERMINATION_REASON.getName(),
+ EarlyTerminationReason.DISTINCT_MAX_ROWS.name());
+ QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(),
metadataBlock);
+ LeafOperator operator =
+ new LeafOperator(OperatorTestUtil.getTracingContext(),
mockQueryRequests(1), schema, queryExecutor,
+ _executorService);
+ _operatorRef.set(operator);
+
+ // When:
+ assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the
metadata block");
+
+ // Then:
+ StatMap<LeafOperator.StatKey> leafStats = operator.copyStatMaps();
+
assertEquals(List.copyOf(leafStats.getStringSet(LeafOperator.StatKey.EARLY_TERMINATION_REASONS)),
+ List.of(EarlyTerminationReason.DISTINCT_MAX_ROWS.name()));
+
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats);
+ assertEquals(brokerResponse.getEarlyTerminationReasons(),
List.of(EarlyTerminationReason.DISTINCT_MAX_ROWS.name()));
+ assertTrue(brokerResponse.isPartialResult());
+ JsonNode responseJson = JsonUtils.objectToJsonNode(brokerResponse);
+ assertEquals(responseJson.path("earlyTerminationReasons").path(0).asText(),
+ EarlyTerminationReason.DISTINCT_MAX_ROWS.name());
+ assertFalse(responseJson.has("maxRowsInDistinctReached"));
+ assertFalse(responseJson.has("maxRowsWithoutChangeInDistinctReached"));
+ assertFalse(responseJson.has("maxExecutionTimeInDistinctReached"));
+ assertTrue(responseJson.path("partialResult").asBoolean(false));
+
+ operator.close();
+ }
+
+ @Test
+ public void shouldSkipNoneEarlyTerminationReason() {
+ assertSkippedEarlyTerminationReason(EarlyTerminationReason.NONE.name());
+ }
+
+ @Test
+ public void shouldSkipUnknownEarlyTerminationReason() {
+ assertSkippedEarlyTerminationReason("UNKNOWN_REASON");
+ }
+
+ @Test
+ public void shouldSkipEmptyEarlyTerminationReason() {
+ assertSkippedEarlyTerminationReason("");
+ }
+
+ @Test
+ public void shouldSkipNullEarlyTerminationReason() {
+ assertSkippedEarlyTerminationReason(null);
+ }
+
+ private void assertSkippedEarlyTerminationReason(@Nullable String
earlyTerminationReason) {
+ // Given:
+ QueryContext queryContext =
QueryContextConverterUtils.getQueryContext("SELECT DISTINCT intCol FROM tbl");
+ DataSchema schema = new DataSchema(new String[]{"intCol"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+ InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new
MetadataResultsBlock());
+
metadataBlock.getResponseMetadata().put(DataTable.MetadataKey.EARLY_TERMINATION_REASON.getName(),
+ earlyTerminationReason);
+ QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(),
metadataBlock);
+ LeafOperator operator =
+ new LeafOperator(OperatorTestUtil.getTracingContext(),
mockQueryRequests(1), schema, queryExecutor,
+ _executorService);
+ _operatorRef.set(operator);
+
+ // When:
+ assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the
metadata block");
+
+ // Then:
+ StatMap<LeafOperator.StatKey> leafStats = operator.copyStatMaps();
+
assertTrue(leafStats.getStringSet(LeafOperator.StatKey.EARLY_TERMINATION_REASONS).isEmpty());
+
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats);
+ assertTrue(brokerResponse.getEarlyTerminationReasons().isEmpty());
+ assertFalse(brokerResponse.isPartialResult());
+
+ operator.close();
+ }
+
@Test
public void shouldReturnAggregationResultBlock() {
// Given:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]