This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4688b5b598e [multistage] Add user-facing warning when MSE Lite hits
implicit query limits (#18725)
4688b5b598e is described below
commit 4688b5b598e3b8579f8afab8b8c144918c231164
Author: Anurag Rai <[email protected]>
AuthorDate: Thu Jun 18 03:33:06 2026 +0530
[multistage] Add user-facing warning when MSE Lite hits implicit query
limits (#18725)
* Surface user-facing warnings when MSE Lite silently truncates query
results
When a query runs under MSE Lite without an explicit LIMIT, the planner
silently inserts a PhysicalSort at the leaf-stage boundary to cap rows
per server. Users get incomplete results with no indication. This change
adds three new fields to BrokerResponseNativeV2
(mseLiteLeafStageLimitReached,
mseLiteLeafStageEffectiveLimit, mseLiteFanOutAdjustedLimitApplied) that
signal when the implicit limit was binding.
Detection happens at execution time on each server via a new
DataTable.MetadataKey, propagated through LeafOperator.StatKey to the
broker response. Both StreamingInstanceResponseOperator (selection queries)
and InstanceResponseOperator (aggregation queries) are instrumented.
* Address PR #18725 review comments for MSE Lite warnings
- Remove redundant _liteModeImplicitSortApplied boolean; derive from
_liteModeEffectiveSortLimit >= 0 (simplifies PhysicalPlannerContext,
QueryEnvironment, QueryPlannerResult)
- Convert ternary to if/else in LiteModeSortInsertRule aggregate branch
- Add comment to LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT explaining it is a
system-internal query option
- Add BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED metric
- Add isMseLiteLeafStageLimitReached to BrokerResponse interface and
RequestContext for broker event listener capture
---
.../requesthandler/BaseBrokerRequestHandler.java | 1 +
.../MultiStageBrokerRequestHandler.java | 19 +++
.../apache/pinot/common/datatable/DataTable.java | 5 +-
.../apache/pinot/common/metrics/BrokerMeter.java | 3 +
.../pinot/common/response/BrokerResponse.java | 7 ++
.../response/broker/BrokerResponseNativeV2.java | 41 ++++++-
.../common/utils/config/QueryOptionsUtils.java | 6 +
.../broker/BrokerResponseNativeV2LiteModeTest.java | 90 +++++++++++++++
.../common/utils/config/QueryOptionsUtilsTest.java | 16 +++
.../core/operator/InstanceResponseOperator.java | 11 ++
.../StreamingInstanceResponseOperator.java | 15 ++-
.../operator/InstanceResponseOperatorTest.java | 127 +++++++++++++++++++++
.../org/apache/pinot/query/QueryEnvironment.java | 21 +++-
.../query/context/PhysicalPlannerContext.java | 13 +++
.../v2/opt/rules/LiteModeSortInsertRule.java | 10 +-
.../pinot/query/runtime/operator/LeafOperator.java | 4 +
.../query/runtime/operator/LeafOperatorTest.java | 59 ++++++++++
.../pinot/spi/trace/DefaultRequestContext.java | 5 +
.../org/apache/pinot/spi/trace/RequestContext.java | 2 +
.../apache/pinot/spi/utils/CommonConstants.java | 3 +
20 files changed, 449 insertions(+), 9 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 1517b698a9d..dc8e1cbba57 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -351,6 +351,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
statistics.setNumExceptions(numExceptions);
statistics.setGroupsTrimmed(response.isGroupsTrimmed());
statistics.setNumGroupsLimitReached(response.isNumGroupsLimitReached());
+
statistics.setMseLiteLeafStageLimitReached(response.isMseLiteLeafStageLimitReached());
statistics.setProcessingTimeMillis(response.getTimeUsedMs());
statistics.setNumDocsScanned(response.getNumDocsScanned());
statistics.setTotalDocs(response.getTotalDocs());
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index ee5f62a487b..ddd8c00da83 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -651,6 +651,12 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
+ // Inject the implicit leaf-stage limit as a query option so servers can
detect truncation
+ if (queryPlanResult.isLiteModeImplicitSortApplied()) {
+
query.getOptions().put(CommonConstants.Broker.Request.QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT,
+ String.valueOf(queryPlanResult.getLiteModeEffectiveSortLimit()));
+ }
+
// Optionally set ignoreMissingSegments query option based on broker
config if not already set.
if
(_config.getProperty(CommonConstants.Broker.CONFIG_OF_IGNORE_MISSING_SEGMENTS,
CommonConstants.Broker.DEFAULT_IGNORE_MISSING_SEGMENTS)) {
@@ -830,6 +836,15 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
}
}
+ // Set MSE Lite planning-time warning fields
+ if (queryPlanResult.isLiteModeImplicitSortApplied()) {
+ int effectiveLimit = queryPlanResult.getLiteModeEffectiveSortLimit();
+ brokerResponse.setMseLiteLeafStageEffectiveLimit(effectiveLimit);
+ brokerResponse.setMseLiteFanOutAdjustedLimitApplied(
+ effectiveLimit !=
QueryOptionsUtils.getLiteModeLeafStageLimit(query.getOptions(),
+ CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT));
+ }
+
long totalTimeMs = System.currentTimeMillis() -
requestContext.getRequestArrivalTimeMillis();
_brokerMetrics.addTimedValue(BrokerTimer.MULTI_STAGE_QUERY_TOTAL_TIME_MS,
totalTimeMs, TimeUnit.MILLISECONDS);
@@ -842,6 +857,10 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
if (brokerResponse.isGroupsTrimmed()) {
_brokerMetrics.addMeteredTableValue(table,
BrokerMeter.BROKER_RESPONSES_WITH_GROUPS_TRIMMED, 1);
}
+ if (brokerResponse.isMseLiteLeafStageLimitReached()) {
+ _brokerMetrics.addMeteredTableValue(table,
+
BrokerMeter.BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED, 1);
+ }
}
brokerResponse.setTimeUsedMs(totalTimeMs);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index ddba67d92df..afb6665e30f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -161,11 +161,12 @@ public interface DataTable {
// the merge (e.g., due to a schema conflict), so the merge ran over a
strict subset of the
// inputs. How a downstream consumer reacts (skip, retry, accept with
annotation) is the
// consumer's policy.
- INCOMPLETE_MERGE(43, "incompleteMerge", MetadataValueType.STRING);
+ INCOMPLETE_MERGE(43, "incompleteMerge", MetadataValueType.STRING),
+ LITE_MODE_LEAF_STAGE_LIMIT_REACHED(44, "liteModeLeafStageLimitReached",
MetadataValueType.STRING);
// We keep this constant to track the max id added so far for backward
compatibility.
// Increase it when adding new keys, but NEVER DECREASE IT!!!
- private static final int MAX_ID = INCOMPLETE_MERGE.getId();
+ private static final int MAX_ID =
LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getId();
private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new
MetadataKey[MAX_ID + 1];
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new
HashMap<>();
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index 614d666c8a4..1bcd8c37b2d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -174,6 +174,9 @@ public class BrokerMeter implements AbstractMetrics.Meter {
public static final BrokerMeter
BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED = create(
"BROKER_RESPONSES_WITH_NUM_GROUPS_LIMIT_REACHED", "badResponses", false);
+ public static final BrokerMeter
BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED = create(
+ "BROKER_RESPONSES_WITH_MSE_LITE_LEAF_STAGE_LIMIT_REACHED",
"badResponses", false);
+
// These metrics track the cost of the query.
public static final BrokerMeter DOCUMENTS_SCANNED = create(
"DOCUMENTS_SCANNED", "documents", false);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index 061c2fed570..1dc6f2d0076 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -166,6 +166,13 @@ public interface BrokerResponse {
*/
boolean isMaxRowsInWindowReached();
+ /**
+ * Returns whether the MSE Lite leaf-stage limit has been reached.
+ */
+ default boolean isMseLiteLeafStageLimitReached() {
+ return false;
+ }
+
/**
* Returns the total time used for query execution in milliseconds.
*/
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
index c9927c537c7..3012c67831d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2.java
@@ -52,7 +52,8 @@ import org.apache.pinot.common.response.ProcessingException;
"explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tablesQueried",
"offlineThreadMemAllocatedBytes", "realtimeThreadMemAllocatedBytes",
"offlineResponseSerMemAllocatedBytes",
"realtimeResponseSerMemAllocatedBytes", "offlineTotalMemAllocatedBytes",
"realtimeTotalMemAllocatedBytes",
- "pools", "rlsFiltersApplied", "groupsTrimmed"
+ "pools", "rlsFiltersApplied", "groupsTrimmed",
+ "mseLiteLeafStageLimitReached", "mseLiteLeafStageEffectiveLimit",
"mseLiteFanOutAdjustedLimitApplied"
})
public class BrokerResponseNativeV2 implements BrokerResponse {
private final StatMap<StatKey> _brokerStats = new StatMap<>(StatKey.class);
@@ -94,6 +95,10 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
private Set<Integer> _pools = Set.of();
private boolean _rlsFiltersApplied = false;
+ @Nullable
+ private Integer _mseLiteLeafStageEffectiveLimit;
+ @Nullable
+ private Boolean _mseLiteFanOutAdjustedLimitApplied;
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
@@ -121,7 +126,7 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
@Override
public boolean isPartialResult() {
return getExceptionsSize() > 0 || isNumGroupsLimitReached() ||
!getEarlyTerminationReasons().isEmpty()
- || isMaxRowsInJoinReached();
+ || isMaxRowsInJoinReached() || isMseLiteLeafStageLimitReached();
}
@Override
@@ -172,6 +177,35 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
_brokerStats.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED,
numGroupsWarningLimitReached);
}
+ public boolean isMseLiteLeafStageLimitReached() {
+ return _brokerStats.getBoolean(StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED);
+ }
+
+ public void mergeMseLiteLeafStageLimitReached(boolean
mseLiteLeafStageLimitReached) {
+ _brokerStats.merge(StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED,
mseLiteLeafStageLimitReached);
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public Integer getMseLiteLeafStageEffectiveLimit() {
+ return _mseLiteLeafStageEffectiveLimit;
+ }
+
+ public void setMseLiteLeafStageEffectiveLimit(int
mseLiteLeafStageEffectiveLimit) {
+ _mseLiteLeafStageEffectiveLimit = mseLiteLeafStageEffectiveLimit;
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @JsonProperty("mseLiteFanOutAdjustedLimitApplied")
+ @Nullable
+ public Boolean getMseLiteFanOutAdjustedLimitApplied() {
+ return _mseLiteFanOutAdjustedLimitApplied;
+ }
+
+ public void setMseLiteFanOutAdjustedLimitApplied(boolean
mseLiteFanOutAdjustedLimitApplied) {
+ _mseLiteFanOutAdjustedLimitApplied = mseLiteFanOutAdjustedLimitApplied;
+ }
+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public List<String> getEarlyTerminationReasons() {
return
List.copyOf(_brokerStats.getStringSet(StatKey.EARLY_TERMINATION_REASONS));
@@ -515,7 +549,8 @@ public class BrokerResponseNativeV2 implements
BrokerResponse {
return Math.max(value1, value2);
}
},
- EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET);
+ EARLY_TERMINATION_REASONS(StatMap.Type.STRING_SET),
+ LITE_MODE_LEAF_STAGE_LIMIT_REACHED(StatMap.Type.BOOLEAN);
private final StatMap.Type _type;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 3134d97f4bf..a9bbda0ee4c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -587,6 +587,12 @@ public class QueryOptionsUtils {
: defaultValue;
}
+ @Nullable
+ public static Integer getLiteModeImplicitLeafStageLimit(Map<String, String>
queryOptions) {
+ String val =
queryOptions.get(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT);
+ return val != null ? Integer.parseInt(val) : null;
+ }
+
@Nullable
private static Integer uncheckedParseInt(String optionName, @Nullable String
optionValue) {
if (optionValue == null) {
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2LiteModeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2LiteModeTest.java
new file mode 100644
index 00000000000..56579cd129a
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/response/broker/BrokerResponseNativeV2LiteModeTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class BrokerResponseNativeV2LiteModeTest {
+
+ @Test
+ public void testLiteModeLeafStageLimitReachedMarksPartialResult() {
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ assertFalse(brokerResponse.isMseLiteLeafStageLimitReached());
+ assertFalse(brokerResponse.isPartialResult());
+
+ brokerResponse.mergeMseLiteLeafStageLimitReached(true);
+
+ assertTrue(brokerResponse.isMseLiteLeafStageLimitReached());
+ assertTrue(brokerResponse.isPartialResult());
+ }
+
+ @Test
+ public void testLiteModeLeafStageLimitReachedViaStatMap() {
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ brokerResponse.addBrokerStats(new
StatMap<>(BrokerResponseNativeV2.StatKey.class)
+
.merge(BrokerResponseNativeV2.StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED,
true));
+
+ assertTrue(brokerResponse.isMseLiteLeafStageLimitReached());
+ assertTrue(brokerResponse.isPartialResult());
+ }
+
+ @Test
+ public void testLiteModeFieldsInJsonSerialization()
+ throws Exception {
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ brokerResponse.mergeMseLiteLeafStageLimitReached(true);
+ brokerResponse.setMseLiteLeafStageEffectiveLimit(100);
+ brokerResponse.setMseLiteFanOutAdjustedLimitApplied(true);
+
+ JsonNode json = JsonUtils.objectToJsonNode(brokerResponse);
+
+ assertTrue(json.path("mseLiteLeafStageLimitReached").asBoolean(false));
+ assertEquals(json.path("mseLiteLeafStageEffectiveLimit").asInt(), 100);
+
assertTrue(json.path("mseLiteFanOutAdjustedLimitApplied").asBoolean(false));
+ assertTrue(json.path("partialResult").asBoolean(false));
+ }
+
+ @Test
+ public void testLiteModeFieldsNotSerializedWhenDefault()
+ throws Exception {
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+
+ JsonNode json = JsonUtils.objectToJsonNode(brokerResponse);
+
+ assertFalse(json.has("mseLiteLeafStageEffectiveLimit"));
+ assertFalse(json.has("mseLiteFanOutAdjustedLimitApplied"));
+ }
+
+ @Test
+ public void testPartialResultNotSetWhenLimitNotReached() {
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ brokerResponse.setMseLiteLeafStageEffectiveLimit(100);
+
+ assertFalse(brokerResponse.isMseLiteLeafStageLimitReached());
+ assertFalse(brokerResponse.isPartialResult());
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
index 8ccc0b1232e..5ece39b66cb 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
@@ -343,4 +343,20 @@ public class QueryOptionsUtilsTest {
throw new IllegalArgumentException("Unexpected key!");
}
}
+
+ @Test
+ public void testGetLiteModeImplicitLeafStageLimit() {
+ Map<String, String> queryOptions = new HashMap<>();
+
+ // Absent → null
+
assertNull(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions));
+
+ // Present → parsed value
+ queryOptions.put(LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "42");
+
assertEquals(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions),
Integer.valueOf(42));
+
+ // Zero
+ queryOptions.put(LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "0");
+
assertEquals(QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(queryOptions),
Integer.valueOf(0));
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
index 5a8927fb4e3..f5de20b2bf3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.operator;
import java.util.Collections;
import java.util.List;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
@@ -29,10 +30,13 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.segment.spi.FetchContext;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.spi.accounting.ThreadResourceSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class InstanceResponseOperator extends
BaseOperator<InstanceResponseBlock> {
private static final String EXPLAIN_NAME = "INSTANCE_RESPONSE";
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InstanceResponseOperator.class);
protected final BaseCombineOperator<?> _combineOperator;
protected final List<SegmentContext> _segmentContexts;
@@ -96,6 +100,13 @@ public class InstanceResponseOperator extends
BaseOperator<InstanceResponseBlock
String.valueOf(_threadMemAllocatedBytes));
instanceResponseBlock.addMetadata(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(),
String.valueOf(_systemActivitiesCpuTimeNs));
+ Integer implicitLimit =
QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(
+ _queryContext.getQueryOptions());
+ // false-positive when table has exactly implicitLimit rows
+ if (implicitLimit != null && baseResultsBlock.getNumRows() >=
implicitLimit) {
+ instanceResponseBlock.addMetadata(
+ MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName(), "true");
+ }
return instanceResponseBlock;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java
index 857c853bcd0..aca7c7ab93b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/streaming/StreamingInstanceResponseOperator.java
@@ -19,6 +19,8 @@
package org.apache.pinot.core.operator.streaming;
import java.util.List;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.operator.InstanceResponseOperator;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
@@ -66,18 +68,22 @@ public class StreamingInstanceResponseOperator extends
InstanceResponseOperator
prefetchAll();
if (_streamingCombineOperator != null) {
_streamingCombineOperator.start();
+ long totalRowsStreamed = 0;
BaseResultsBlock resultsBlock = getBaseBlock();
while (!(resultsBlock instanceof MetadataResultsBlock)) {
if (resultsBlock instanceof ExceptionResultsBlock) {
return new InstanceResponseBlock(resultsBlock);
}
if (resultsBlock.getNumRows() > 0) {
+ totalRowsStreamed += resultsBlock.getNumRows();
_streamer.send(resultsBlock);
}
resultsBlock = getBaseBlock();
}
// Return a metadata-only block in the end
- return buildInstanceResponseBlock(resultsBlock);
+ InstanceResponseBlock responseBlock =
buildInstanceResponseBlock(resultsBlock);
+ addLiteModeMetadataIfNeeded(responseBlock, totalRowsStreamed);
+ return responseBlock;
} else {
// Handle single block combine operator in streaming fashion
BaseResultsBlock resultsBlock = getBaseBlock();
@@ -115,6 +121,13 @@ public class StreamingInstanceResponseOperator extends
InstanceResponseOperator
return _combineOperator.nextBlock();
}
+ private void addLiteModeMetadataIfNeeded(InstanceResponseBlock
responseBlock, long totalRowsStreamed) {
+ Integer implicitLimit =
QueryOptionsUtils.getLiteModeImplicitLeafStageLimit(_queryContext.getQueryOptions());
+ if (implicitLimit != null && totalRowsStreamed >= implicitLimit) {
+
responseBlock.addMetadata(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName(),
"true");
+ }
+ }
+
@Override
public String toExplainString() {
return EXPLAIN_NAME;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/InstanceResponseOperatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/InstanceResponseOperatorTest.java
new file mode 100644
index 00000000000..9b2351ad476
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/InstanceResponseOperatorTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.core.operator.combine.BaseCombineOperator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class InstanceResponseOperatorTest {
+
+ @Test
+ public void testLiteModeLeafStageLimitReachedWhenNumRowsEqualsLimit() {
+ Map<String, String> queryOptions = new HashMap<>();
+ queryOptions.put(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "10");
+
+ SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class);
+ when(resultsBlock.getNumRows()).thenReturn(10);
+ when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>());
+ when(resultsBlock.getNumServerThreads()).thenReturn(1);
+
+ QueryContext queryContext = mock(QueryContext.class);
+ when(queryContext.getQueryOptions()).thenReturn(queryOptions);
+
+ InstanceResponseOperator operator = new InstanceResponseOperator(
+ mock(BaseCombineOperator.class), Collections.emptyList(),
Collections.emptyList(), queryContext);
+
+ InstanceResponseBlock responseBlock =
operator.buildInstanceResponseBlock(resultsBlock);
+
+
assertEquals(responseBlock.getResponseMetadata().get(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName()),
+ "true");
+ }
+
+ @Test
+ public void testLiteModeLeafStageLimitNotReachedWhenNumRowsBelowLimit() {
+ Map<String, String> queryOptions = new HashMap<>();
+ queryOptions.put(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "10");
+
+ SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class);
+ when(resultsBlock.getNumRows()).thenReturn(5);
+ when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>());
+ when(resultsBlock.getNumServerThreads()).thenReturn(1);
+
+ QueryContext queryContext = mock(QueryContext.class);
+ when(queryContext.getQueryOptions()).thenReturn(queryOptions);
+
+ InstanceResponseOperator operator = new InstanceResponseOperator(
+ mock(BaseCombineOperator.class), Collections.emptyList(),
Collections.emptyList(), queryContext);
+
+ InstanceResponseBlock responseBlock =
operator.buildInstanceResponseBlock(resultsBlock);
+
+ assertFalse(responseBlock.getResponseMetadata()
+
.containsKey(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName()));
+ }
+
+ @Test
+ public void testNoLiteModeMetadataWhenOptionAbsent() {
+ Map<String, String> queryOptions = new HashMap<>();
+
+ SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class);
+ when(resultsBlock.getNumRows()).thenReturn(100);
+ when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>());
+ when(resultsBlock.getNumServerThreads()).thenReturn(1);
+
+ QueryContext queryContext = mock(QueryContext.class);
+ when(queryContext.getQueryOptions()).thenReturn(queryOptions);
+
+ InstanceResponseOperator operator = new InstanceResponseOperator(
+ mock(BaseCombineOperator.class), Collections.emptyList(),
Collections.emptyList(), queryContext);
+
+ InstanceResponseBlock responseBlock =
operator.buildInstanceResponseBlock(resultsBlock);
+
+ assertFalse(responseBlock.getResponseMetadata()
+
.containsKey(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName()));
+ }
+
+ @Test
+ public void testLiteModeLeafStageLimitReachedWhenNumRowsExceedsLimit() {
+ Map<String, String> queryOptions = new HashMap<>();
+ queryOptions.put(QueryOptionKey.LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT, "10");
+
+ SelectionResultsBlock resultsBlock = mock(SelectionResultsBlock.class);
+ when(resultsBlock.getNumRows()).thenReturn(15);
+ when(resultsBlock.getResultsMetadata()).thenReturn(new HashMap<>());
+ when(resultsBlock.getNumServerThreads()).thenReturn(1);
+
+ QueryContext queryContext = mock(QueryContext.class);
+ when(queryContext.getQueryOptions()).thenReturn(queryOptions);
+
+ InstanceResponseOperator operator = new InstanceResponseOperator(
+ mock(BaseCombineOperator.class), Collections.emptyList(),
Collections.emptyList(), queryContext);
+
+ InstanceResponseBlock responseBlock =
operator.buildInstanceResponseBlock(resultsBlock);
+
+ assertTrue(responseBlock.getResponseMetadata()
+
.containsKey(MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName()));
+ }
+}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 83b8e4a5f20..2fb657de1fd 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -277,7 +277,13 @@ public class QueryEnvironment {
extraFields.put(RuleTimingPlannerListener.RULE_TIMINGS,
plannerContext.getPlannerOutput().get(RuleTimingPlannerListener.RULE_TIMINGS));
}
- return new QueryPlannerResult(dispatchableSubPlan, explainStr, tableNames,
extraFields);
+ int liteModeEffectiveSortLimit = -1;
+ PhysicalPlannerContext physicalPlannerContext =
plannerContext.getPhysicalPlannerContext();
+ if (physicalPlannerContext != null) {
+ liteModeEffectiveSortLimit =
physicalPlannerContext.getLiteModeEffectiveSortLimit();
+ }
+ return new QueryPlannerResult(dispatchableSubPlan, explainStr, tableNames,
extraFields,
+ liteModeEffectiveSortLimit);
}
/// @deprecated Use [#compile] and then
[explain][CompiledQuery#explain(long) ] the returned query instead
@@ -354,13 +360,16 @@ public class QueryEnvironment {
private final String _explainPlan;
private final Set<String> _tableNames;
private final Map<String, String> _extraFields;
+ private final int _liteModeEffectiveSortLimit;
QueryPlannerResult(@Nullable DispatchableSubPlan dispatchableSubPlan,
@Nullable String explainPlan,
- Set<String> tableNames, Map<String, String> extraFields) {
+ Set<String> tableNames, Map<String, String> extraFields,
+ int liteModeEffectiveSortLimit) {
_dispatchableSubPlan = dispatchableSubPlan;
_explainPlan = explainPlan;
_tableNames = tableNames;
_extraFields = extraFields;
+ _liteModeEffectiveSortLimit = liteModeEffectiveSortLimit;
}
public String getExplainPlan() {
@@ -378,6 +387,14 @@ public class QueryEnvironment {
public Map<String, String> getExtraFields() {
return _extraFields;
}
+
+ public boolean isLiteModeImplicitSortApplied() {
+ return _liteModeEffectiveSortLimit >= 0;
+ }
+
+ public int getLiteModeEffectiveSortLimit() {
+ return _liteModeEffectiveSortLimit;
+ }
}
// --------------------------------------------------------------------------
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index 816190ae719..451f48b1bc2 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -74,6 +74,7 @@ public class PhysicalPlannerContext {
private final boolean _liteModeJoinsEnabled;
@Nullable
private final MultiClusterRoutingContext _multiClusterRoutingContext;
+ private int _liteModeEffectiveSortLimit = -1;
/**
* Used by controller when it needs to extract table names from the query.
@@ -186,6 +187,18 @@ public class PhysicalPlannerContext {
return _liteModeLeafStageFanOutAdjustedLimit;
}
+ public void setLiteModeEffectiveSortLimit(int effectiveLimit) {
+ _liteModeEffectiveSortLimit = effectiveLimit;
+ }
+
+ public boolean isLiteModeImplicitSortApplied() {
+ return _liteModeEffectiveSortLimit >= 0;
+ }
+
+ public int getLiteModeEffectiveSortLimit() {
+ return _liteModeEffectiveSortLimit;
+ }
+
/**
* Gets a random instance id from the registered instances in the context.
* <p>
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
index e8128699b41..7bee7f08440 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LiteModeSortInsertRule.java
@@ -81,6 +81,7 @@ public class LiteModeSortInsertRule extends PRelOptRule {
liteModeLimit);
return sort;
}
+ _context.setLiteModeEffectiveSortLimit(liteModeLimit);
return sort.withFetch(newFetch);
}
if (call._currentNode instanceof PhysicalAggregate) {
@@ -88,7 +89,13 @@ public class LiteModeSortInsertRule extends PRelOptRule {
PhysicalAggregate aggregate = (PhysicalAggregate) call._currentNode;
Preconditions.checkState(aggregate.getLimit() <= liteModeLimit,
"Group trim limit={} exceeds server stage limit={}",
aggregate.getLimit(), liteModeLimit);
- int limit = aggregate.getLimit() > 0 ? aggregate.getLimit() :
liteModeLimit;
+ int limit;
+ if (aggregate.getLimit() > 0) {
+ limit = aggregate.getLimit();
+ } else {
+ limit = liteModeLimit;
+ _context.setLiteModeEffectiveSortLimit(liteModeLimit);
+ }
return aggregate.withLimit(limit);
}
RelCollation relCollation = RelCollations.EMPTY;
@@ -103,6 +110,7 @@ public class LiteModeSortInsertRule extends PRelOptRule {
}
}
PRelNode input = call._currentNode;
+ _context.setLiteModeEffectiveSortLimit(liteModeLimit);
return new PhysicalSort(input.unwrap().getCluster(),
RelTraitSet.createEmpty(), List.of(),
relCollation, null /* offset */, newFetch, input, nodeId(),
input.getPinotDataDistributionOrThrow(),
true);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 289ccc53c75..5ec9d9fd2e2 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -368,6 +368,9 @@ public class LeafOperator extends MultiStageOperator {
case NUM_GROUPS_WARNING_LIMIT_REACHED:
_statMap.merge(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED,
Boolean.parseBoolean(entry.getValue()));
break;
+ case LITE_MODE_LEAF_STAGE_LIMIT_REACHED:
+ _statMap.merge(StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED,
Boolean.parseBoolean(entry.getValue()));
+ break;
case TIME_USED_MS:
_statMap.merge(StatKey.SSE_EXECUTION_TIME_MS,
Long.parseLong(entry.getValue()));
break;
@@ -750,6 +753,7 @@ public class LeafOperator extends MultiStageOperator {
GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
+ LITE_MODE_LEAF_STAGE_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_RESIZES(StatMap.Type.INT, null),
RESIZE_TIME_MS(StatMap.Type.LONG, null),
THREAD_CPU_TIME_NS(StatMap.Type.LONG, null),
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
index b56840d05c7..ebcbbb2e13b 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafOperatorTest.java
@@ -647,4 +647,63 @@ public class LeafOperatorTest {
operator.close();
}
+
+ @Test
+ public void shouldPropagateLiteModeLeafStageLimitReached() {
+ // Given:
+ DataSchema schema = new DataSchema(new String[]{"intCol"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+ InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new
MetadataResultsBlock());
+
metadataBlock.getResponseMetadata().put(DataTable.MetadataKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED.getName(),
+ "true");
+ QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(),
metadataBlock);
+ LeafOperator operator =
+ new LeafOperator(OperatorTestUtil.getTracingContext(),
mockQueryRequests(1), schema, queryExecutor,
+ _executorService);
+ _operatorRef.set(operator);
+
+ // When:
+ assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the
metadata block");
+
+ // Then:
+ StatMap<LeafOperator.StatKey> leafStats = operator.copyStatMaps();
+
assertTrue(leafStats.getBoolean(LeafOperator.StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED));
+
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats);
+ assertTrue(brokerResponse.isMseLiteLeafStageLimitReached());
+ assertTrue(brokerResponse.isPartialResult());
+ JsonNode responseJson = JsonUtils.objectToJsonNode(brokerResponse);
+
assertTrue(responseJson.path("mseLiteLeafStageLimitReached").asBoolean(false));
+ assertTrue(responseJson.path("partialResult").asBoolean(false));
+
+ operator.close();
+ }
+
+ @Test
+ public void shouldNotSetLiteModeLeafStageLimitWhenNotReached() {
+ // Given:
+ DataSchema schema = new DataSchema(new String[]{"intCol"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+ InstanceResponseBlock metadataBlock = new InstanceResponseBlock(new
MetadataResultsBlock());
+ QueryExecutor queryExecutor = mockQueryExecutor(Collections.emptyList(),
metadataBlock);
+ LeafOperator operator =
+ new LeafOperator(OperatorTestUtil.getTracingContext(),
mockQueryRequests(1), schema, queryExecutor,
+ _executorService);
+ _operatorRef.set(operator);
+
+ // When:
+ assertTrue(operator.nextBlock().isEos(), "Expected EOS after reading the
metadata block");
+
+ // Then:
+ StatMap<LeafOperator.StatKey> leafStats = operator.copyStatMaps();
+
assertFalse(leafStats.getBoolean(LeafOperator.StatKey.LITE_MODE_LEAF_STAGE_LIMIT_REACHED));
+
+ BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
+ MultiStageOperator.Type.LEAF.mergeInto(brokerResponse, leafStats);
+ assertFalse(brokerResponse.isMseLiteLeafStageLimitReached());
+ assertFalse(brokerResponse.isPartialResult());
+
+ operator.close();
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
index f29adcc327e..38067ee1614 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
@@ -507,6 +507,11 @@ public class DefaultRequestContext implements RequestScope
{
_isNumGroupsLimitReached = numGroupsLimitReached;
}
+ @Override
+ public void setMseLiteLeafStageLimitReached(boolean
mseLiteLeafStageLimitReached) {
+ // No-op: not tracked in default context
+ }
+
@Override
public void setNumExceptions(int numExceptions) {
_numExceptions = numExceptions;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
index 95f1cf25ea1..37e413765dc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
@@ -186,6 +186,8 @@ public interface RequestContext {
void setNumGroupsLimitReached(boolean numGroupsLimitReached);
+ void setMseLiteLeafStageLimitReached(boolean mseLiteLeafStageLimitReached);
+
void setNumExceptions(int numExceptions);
void setNumRowsResultSet(int numRowsResultSet);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index c93b7d1087e..3b2d69a33b5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -981,6 +981,9 @@ public class CommonConstants {
// Server stage limit for lite mode queries.
public static final String LITE_MODE_LEAF_STAGE_LIMIT =
"liteModeLeafStageLimit";
public static final String LITE_MODE_LEAF_STAGE_FANOUT_ADJUSTED_LIMIT
= "liteModeLeafStageFanOutAdjustedLimit";
+ // System-internal option injected by the broker when MSE Lite
implicitly inserts a leaf-stage limit.
+ // Not user-settable; used by servers to detect silent truncation at
execution time.
+ public static final String LITE_MODE_IMPLICIT_LEAF_STAGE_LIMIT =
"liteModeImplicitLeafStageLimit";
// Used by the MSE engine to enable broker-side segment pruning during
routing. The physical optimizer
// path defaults to DEFAULT_USE_BROKER_PRUNING (true); the logical
planner path defaults to
// DEFAULT_LOGICAL_PLANNER_USE_BROKER_PRUNING (false). Both can be
overridden per-query.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]