This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 45a058bf734 [multistage] Add Config for Disabling Joins in MSE Lite
Mode (#17030)
45a058bf734 is described below
commit 45a058bf734f53b05bd76407a6713418c0ae6958
Author: Shreyaa Sharma <[email protected]>
AuthorDate: Fri Oct 17 21:52:48 2025 +0530
[multistage] Add Config for Disabling Joins in MSE Lite Mode (#17030)
---
.../MultiStageBrokerRequestHandler.java | 4 +
.../org/apache/pinot/query/QueryEnvironment.java | 12 ++-
.../query/context/PhysicalPlannerContext.java | 12 ++-
.../planner/physical/v2/PRelNodeTreeValidator.java | 34 +++++++-
.../v2/validation/LiteModeJoinValidationTest.java | 97 ++++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 9 ++
6 files changed, 162 insertions(+), 6 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 9b40baa0c13..9b420d12a14 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -412,6 +412,9 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
int defaultLiteModeFanoutAdjustedLimit = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_LITE_MODE_LEAF_STAGE_FANOUT_ADJUSTED_LIMIT,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT);
+ boolean defaultLiteModeEnableJoins = _config.getProperty(
+ CommonConstants.Broker.CONFIG_OF_LITE_MODE_ENABLE_JOINS,
+ CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS);
String defaultHashFunction = _config.getProperty(
CommonConstants.Broker.CONFIG_OF_BROKER_DEFAULT_HASH_FUNCTION,
CommonConstants.Broker.DEFAULT_BROKER_DEFAULT_HASH_FUNCTION);
@@ -437,6 +440,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
.defaultUseBrokerPruning(defaultUseBrokerPruning)
.defaultLiteModeLeafStageLimit(defaultLiteModeLeafStageLimit)
.defaultLiteModeLeafStageFanOutAdjustedLimit(defaultLiteModeFanoutAdjustedLimit)
+ .defaultLiteModeEnableJoins(defaultLiteModeEnableJoins)
.defaultHashFunction(defaultHashFunction)
.build();
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 7b5524f3468..5f881322f72 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -207,7 +207,7 @@ public class QueryEnvironment {
workerManager.getInstanceId(), sqlNodeAndOptions.getOptions(),
_envConfig.defaultUseLiteMode(), _envConfig.defaultRunInBroker(),
_envConfig.defaultUseBrokerPruning(),
_envConfig.defaultLiteModeLeafStageLimit(),
_envConfig.defaultHashFunction(),
- _envConfig.defaultLiteModeLeafStageFanOutAdjustedLimit());
+ _envConfig.defaultLiteModeLeafStageFanOutAdjustedLimit(),
_envConfig.defaultLiteModeEnableJoins());
}
return new PlannerContext(_config, _catalogReader, _typeFactory,
optProgram, traitProgram,
sqlNodeAndOptions.getOptions(), _envConfig, format,
physicalPlannerContext);
@@ -365,7 +365,7 @@ public class QueryEnvironment {
Preconditions.checkNotNull(plannerContext.getPhysicalPlannerContext(),
"Physical planner context is null");
optimized = RelToPRelConverter.toPRelNode(optimized,
plannerContext.getPhysicalPlannerContext(),
_envConfig.getTableCache()).unwrap();
- PRelNodeTreeValidator.validate((PRelNode) optimized);
+ PRelNodeTreeValidator.validate((PRelNode) optimized,
plannerContext.getPhysicalPlannerContext());
}
return relation.withRel(optimized);
}
@@ -795,6 +795,14 @@ public class QueryEnvironment {
return CommonConstants.Broker.DEFAULT_BROKER_DEFAULT_HASH_FUNCTION;
}
+ /**
+ * Whether to enable joins when using MSE Lite mode.
+ */
+ @Value.Default
+ default boolean defaultLiteModeEnableJoins() {
+ return CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS;
+ }
+
/**
* Returns the worker manager.
*
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index 49d433c3cee..3a7fafe9b53 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -70,6 +70,7 @@ public class PhysicalPlannerContext {
private final int _liteModeLeafStageLimit;
private final int _liteModeLeafStageFanOutAdjustedLimit;
private final DistHashFunction _defaultHashFunction;
+ private final boolean _liteModeJoinsEnabled;
/**
* Used by controller when it needs to extract table names from the query.
@@ -88,6 +89,7 @@ public class PhysicalPlannerContext {
_liteModeLeafStageLimit =
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT;
_liteModeLeafStageFanOutAdjustedLimit =
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT;
_defaultHashFunction =
DistHashFunction.valueOf(KeySelector.DEFAULT_HASH_ALGORITHM.toUpperCase());
+ _liteModeJoinsEnabled =
CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS;
}
public PhysicalPlannerContext(RoutingManager routingManager, String
hostName, int port, long requestId,
@@ -95,13 +97,14 @@ public class PhysicalPlannerContext {
this(routingManager, hostName, port, requestId, instanceId, queryOptions,
CommonConstants.Broker.DEFAULT_USE_LITE_MODE,
CommonConstants.Broker.DEFAULT_RUN_IN_BROKER,
CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT,
- KeySelector.DEFAULT_HASH_ALGORITHM,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT);
+ KeySelector.DEFAULT_HASH_ALGORITHM,
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT,
+ CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS);
}
public PhysicalPlannerContext(RoutingManager routingManager, String
hostName, int port, long requestId,
String instanceId, Map<String, String> queryOptions, boolean
defaultUseLiteMode, boolean defaultRunInBroker,
boolean defaultUseBrokerPruning, int defaultLiteModeLeafStageLimit,
String defaultHashFunction,
- int defaultLiteModeLeafStageFanOutAdjustedLimit) {
+ int defaultLiteModeLeafStageFanOutAdjustedLimit, boolean
defaultLiteModeEnableJoins) {
_routingManager = routingManager;
_hostName = hostName;
_port = port;
@@ -117,6 +120,7 @@ public class PhysicalPlannerContext {
defaultLiteModeLeafStageFanOutAdjustedLimit);
_defaultHashFunction =
DistHashFunction.valueOf(defaultHashFunction.toUpperCase());
_instanceIdToQueryServerInstance.put(instanceId,
getBrokerQueryServerInstance());
+ _liteModeJoinsEnabled = defaultLiteModeEnableJoins;
}
public Supplier<Integer> getNodeIdGenerator() {
@@ -156,6 +160,10 @@ public class PhysicalPlannerContext {
return _useLiteMode;
}
+ public boolean isLiteModeJoinsEnabled() {
+ return _liteModeJoinsEnabled;
+ }
+
public boolean isRunInBroker() {
return _runInBroker;
}
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java
index b103696165a..d10c1790a72 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java
@@ -22,6 +22,8 @@ import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Window;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.query.context.PhysicalPlannerContext;
+import org.apache.pinot.spi.exception.QueryErrorCode;
/**
@@ -37,8 +39,36 @@ public class PRelNodeTreeValidator {
* Validate the tree rooted at the given PRelNode. Ideally all issues with
the plan should be caught even with an
* EXPLAIN, hence this method should be called as part of query compilation
itself.
*/
- public static void validate(PRelNode rootNode) {
- // TODO(mse-physical): Add plan validations here.
+ public static void validate(PRelNode rootNode, PhysicalPlannerContext
context) {
+ if (context == null) {
+ return;
+ }
+ validateLiteModeJoins(rootNode, context);
+ }
+
+ /**
+ * Disables joins in lite mode based on broker-initialized config carried in
context.
+ */
+ private static void validateLiteModeJoins(PRelNode rootNode,
PhysicalPlannerContext context) {
+ if (!context.isUseLiteMode() || context.isLiteModeJoinsEnabled()) {
+ return;
+ }
+ // If joins are disabled in lite mode, we need to check if the plan
contains joins
+ if (containsJoin(rootNode)) {
+ throw QueryErrorCode.QUERY_PLANNING.asException("Joins are disabled in
lite mode");
+ }
+ }
+
+ private static boolean containsJoin(PRelNode node) {
+ if (node.unwrap() instanceof Join) {
+ return true;
+ }
+ for (PRelNode input : node.getPRelInputs()) {
+ if (containsJoin(input)) {
+ return true;
+ }
+ }
+ return false;
}
/**
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
new file mode 100644
index 00000000000..698413c39fe
--- /dev/null
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.validation;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.pinot.query.context.PhysicalPlannerContext;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
+import org.apache.pinot.query.planner.physical.v2.PRelNodeTreeValidator;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class LiteModeJoinValidationTest {
+
+ private static PhysicalPlannerContext buildContext(boolean
liteModeJoinsEnabled, boolean useLiteMode) {
+ return new PhysicalPlannerContext(
+ null,
+ "localhost",
+ 8000,
+ 1L,
+ "Broker_localhost",
+ Map.of(),
+ useLiteMode,
+ false,
+ false,
+ CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT,
+ KeySelector.DEFAULT_HASH_ALGORITHM,
+
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT,
+ liteModeJoinsEnabled);
+ }
+
+ private static PRelNode makeJoinPlan() {
+ PRelNode node = Mockito.mock(PRelNode.class);
+ Join join = Mockito.mock(Join.class);
+ Mockito.doReturn(join).when(node).unwrap();
+ Mockito.doReturn(List.of()).when(node).getPRelInputs();
+ return node;
+ }
+
+ private static PRelNode makeNoJoinPlan() {
+ PRelNode node = Mockito.mock(PRelNode.class);
+ RelNode rel = Mockito.mock(RelNode.class);
+ Mockito.doReturn(rel).when(node).unwrap();
+ Mockito.doReturn(List.of()).when(node).getPRelInputs();
+ return node;
+ }
+
+ @Test
+ public void testJoinEnabledByDefaultInLiteMode() {
+ PhysicalPlannerContext ctx =
buildContext(CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS, true);
+ PRelNode plan = makeJoinPlan();
+ PRelNodeTreeValidator.validate(plan, ctx);
+ }
+
+ @Test
+ public void testJoinNotAllowedWhenEnabledInLiteMode() {
+ PhysicalPlannerContext ctx = buildContext(false, true);
+ PRelNode plan = makeJoinPlan();
+ try {
+ PRelNodeTreeValidator.validate(plan, ctx);
+ Assert.fail("Expected QUERY_PLANNING error due to joins disabled in lite
mode");
+ } catch (QueryException qe) {
+ Assert.assertEquals(qe.getErrorCode(), QueryErrorCode.QUERY_PLANNING);
+ Assert.assertTrue(qe.getMessage().contains("Joins are disabled in lite
mode"), qe.getMessage());
+ }
+ }
+
+ @Test
+ public void testNoJoinPassesEvenWhenDisabled() {
+ PhysicalPlannerContext ctx = buildContext(false, true);
+ PRelNode plan = makeNoJoinPlan();
+ PRelNodeTreeValidator.validate(plan, ctx);
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index ab3ec338840..8769b4d43d4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -571,6 +571,15 @@ public class CommonConstants {
"pinot.broker.multistage.lite.mode.leaf.stage.fanOutAdjustedLimit";
public static final int
DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT = -1;
+ /**
+ * Whether to enable JOIN queries when MSE Lite mode is enabled. By
default joins are enabled
+ * in lite mode unless explicitly disabled. This value cannot be
overridden by query option.
+ */
+ public static final String CONFIG_OF_LITE_MODE_ENABLE_JOINS =
+ "pinot.broker.multistage.lite.mode.enable.joins";
+ public static final boolean DEFAULT_LITE_MODE_ENABLE_JOINS = true;
+
+
// Config for default hash function used in KeySelector for data shuffling
public static final String CONFIG_OF_BROKER_DEFAULT_HASH_FUNCTION =
"pinot.broker.multistage.default.hash.function";
public static final String DEFAULT_BROKER_DEFAULT_HASH_FUNCTION =
"absHashCode";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]