This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a0cba7b2838 Add broker config to enable streaming group-by by default
for a cluster (#18510)
a0cba7b2838 is described below
commit a0cba7b2838c6f90bfe83c8a6dcfdedd33fbd24e
Author: Yash Mayya <[email protected]>
AuthorDate: Wed May 20 21:12:21 2026 -0700
Add broker config to enable streaming group-by by default for a cluster
(#18510)
---
.../MultiStageBrokerRequestHandler.java | 26 +++++++
.../MultiStageBrokerRequestHandlerTest.java | 81 ++++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 10 +++
3 files changed, 117 insertions(+)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 846f6acb3c8..a5ad9811d60 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -139,6 +140,8 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
private final Set<String> _defaultDisabledPlannerRules;
protected final long _extraPassiveTimeoutMs;
protected final boolean _enableQueryFingerprinting;
+ @Nullable
+ protected final String _defaultStreamingGroupByFlushThreshold;
protected final PinotMeter _stagesStartedMeter =
BrokerMeter.MSE_STAGES_STARTED.getGlobalMeter();
protected final PinotMeter _stagesFinishedMeter =
BrokerMeter.MSE_STAGES_COMPLETED.getGlobalMeter();
@@ -204,6 +207,13 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
_enableQueryFingerprinting = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_QUERY_FINGERPRINTING,
CommonConstants.Broker.DEFAULT_BROKER_ENABLE_QUERY_FINGERPRINTING);
+ int streamingGroupByFlushThreshold = _config.getProperty(
+
CommonConstants.Broker.CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD,
+ CommonConstants.Broker.DEFAULT_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD);
+ // Pre-format the threshold once so that we don't allocate a new String on
every query when the feature is enabled.
+ // null indicates "feature disabled", which matches the
broker-config-unset case.
+ _defaultStreamingGroupByFlushThreshold =
+ streamingGroupByFlushThreshold > 0 ?
Integer.toString(streamingGroupByFlushThreshold) : null;
}
@Override
@@ -398,6 +408,9 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
AtomicBoolean rlsFiltersApplied = new AtomicBoolean(false);
checkAuthorization(requesterIdentity, requestContext, httpHeaders,
compiledQuery, rlsFiltersApplied);
+ // Apply broker-default query options before branching to
EXPLAIN/execute so both paths see the same options.
+ applyBrokerDefaultQueryOptions(compiledQuery.getOptions());
+
if (sqlNodeAndOptions.getSqlNode().getKind() == SqlKind.EXPLAIN) {
return explain(compiledQuery, requestId, requestContext, queryTimer);
} else {
@@ -545,6 +558,19 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
.build();
}
+ /**
+ * Applies broker-level defaults for MSE query options. Per-query overrides
(i.e. {@code SET option = value} in the
+ * SQL text) always win because we use {@link Map#putIfAbsent} — a user can
set the option to {@code 0} to opt out of
+ * a streaming default that the cluster has enabled.
+ */
+ @VisibleForTesting
+ void applyBrokerDefaultQueryOptions(Map<String, String> queryOptions) {
+ if (_defaultStreamingGroupByFlushThreshold != null) {
+
queryOptions.putIfAbsent(CommonConstants.Broker.Request.QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD,
+ _defaultStreamingGroupByFlushThreshold);
+ }
+ }
+
private long getTimeoutMs(Map<String, String> queryOptions) {
Long timeoutMsFromQueryOption =
QueryOptionsUtils.getTimeoutMs(queryOptions);
return timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption :
_brokerTimeoutMs;
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
index 7bb7b077610..b4b5a13b5fc 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandlerTest.java
@@ -19,6 +19,8 @@
package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
@@ -37,6 +39,8 @@ import org.apache.pinot.spi.auth.broker.RequesterIdentity;
import org.apache.pinot.spi.env.PinotConfiguration;
import
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
@@ -108,4 +112,81 @@ public class MultiStageBrokerRequestHandlerTest {
Assert.assertNotNull(capturedResponse.get(),
"onQueryCompletion hook must be called with the BrokerResponse from
handleRequest for MSE");
}
+
+ @Test
+ public void
testApplyBrokerDefaultQueryOptionsInjectsStreamingGroupByFlushThreshold()
+ throws Exception {
+ // When the broker config is set, the option is injected for queries that
don't already specify it.
+ MultiStageBrokerRequestHandler handler =
newHandlerWithStreamingGroupByFlushThreshold("5000");
+
+ Map<String, String> queryOptions = new HashMap<>();
+ handler.applyBrokerDefaultQueryOptions(queryOptions);
+
Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD),
"5000",
+ "Broker default should be injected when query option is absent");
+ }
+
+ @Test
+ public void testApplyBrokerDefaultQueryOptionsPerQueryOverrideWins()
+ throws Exception {
+ // A per-query SET — including SET = 0 to disable — must take precedence
over the broker default.
+ MultiStageBrokerRequestHandler handler =
newHandlerWithStreamingGroupByFlushThreshold("5000");
+
+ Map<String, String> queryOptions = new HashMap<>();
+ queryOptions.put(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD, "0");
+ handler.applyBrokerDefaultQueryOptions(queryOptions);
+
Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD),
"0",
+ "Per-query SET = 0 must override the broker default");
+
+ queryOptions.clear();
+ queryOptions.put(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD, "100");
+ handler.applyBrokerDefaultQueryOptions(queryOptions);
+
Assert.assertEquals(queryOptions.get(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD),
"100",
+ "Per-query SET must take precedence over the broker default");
+ }
+
+ @Test
+ public void testApplyBrokerDefaultQueryOptionsNoInjectionWhenConfigUnset()
+ throws Exception {
+ // With the broker config unset (default -1), no option is injected.
+ MultiStageBrokerRequestHandler handler =
newHandlerWithStreamingGroupByFlushThreshold(null);
+
+ Map<String, String> queryOptions = new HashMap<>();
+ handler.applyBrokerDefaultQueryOptions(queryOptions);
+
Assert.assertFalse(queryOptions.containsKey(QueryOptionKey.STREAMING_GROUP_BY_FLUSH_THRESHOLD),
+ "No option should be injected when the broker default is unset");
+ }
+
+ private static MultiStageBrokerRequestHandler
newHandlerWithStreamingGroupByFlushThreshold(
+ @Nullable String streamingGroupByFlushThreshold)
+ throws Exception {
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME,
"localhost");
+ config.setProperty(MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
Integer.toString(NetUtils.findOpenPort()));
+ if (streamingGroupByFlushThreshold != null) {
+
config.setProperty(CommonConstants.Broker.CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD,
+ streamingGroupByFlushThreshold);
+ }
+ BrokerQueryEventListenerFactory.init(config);
+ BrokerMetrics.register(mock(BrokerMetrics.class));
+
+ QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
+ when(queryQuotaManager.acquire(anyString())).thenReturn(true);
+ when(queryQuotaManager.acquireDatabase(anyString())).thenReturn(true);
+ when(queryQuotaManager.acquireApplication(anyString())).thenReturn(true);
+
+ return new MultiStageBrokerRequestHandler(config, "testBrokerId", new
BrokerRequestIdGenerator(),
+ mock(RoutingManager.class), new AllowAllAccessControlFactory(),
queryQuotaManager,
+ mock(TableCache.class), mock(MultiStageQueryThrottler.class),
mock(FailureDetector.class),
+ ThreadAccountantUtils.getNoOpAccountant(), null,
mock(WorkerManager.class), mock(WorkerManager.class)) {
+ @Override
+ public void start() {
+ // Skip dispatcher.start() and Calcite warmupCompile — neither is
needed for this test.
+ }
+
+ @Override
+ public void shutDown() {
+ // Match start() — no dispatcher was started, so there is nothing to
shut down.
+ }
+ };
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index ac64394aad0..39b0e22a146 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -507,6 +507,16 @@ public class CommonConstants {
public static final String CONFIG_OF_IGNORE_MISSING_SEGMENTS =
"pinot.broker.query.ignore.missing.segments";
public static final boolean DEFAULT_IGNORE_MISSING_SEGMENTS = false;
+
+ /**
+ * Default flush threshold for the streaming group-by leaf-stage operator
on MSE. When positive, the broker
+ * injects this value as the `streamingGroupByFlushThreshold` query option
for MSE queries that do not already
+ * specify it, opting the cluster into the streaming group-by behavior by
default. Setting the query option
+ * explicitly (including to `0` to disable) always wins over the broker
default.
+ */
+ public static final String
CONFIG_OF_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD =
+ "pinot.broker.mse.streaming.group.by.flush.threshold";
+ public static final int DEFAULT_MSE_STREAMING_GROUP_BY_FLUSH_THRESHOLD =
-1;
// Whether to infer partition hint by default or not.
// This value can always be overridden by INFER_PARTITION_HINT query option
public static final String CONFIG_OF_INFER_PARTITION_HINT =
"pinot.broker.multistage.infer.partition.hint";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]