This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 45a058bf734 [multistage] Add Config for Disabling Joins in MSE Lite 
Mode (#17030)
45a058bf734 is described below

commit 45a058bf734f53b05bd76407a6713418c0ae6958
Author: Shreyaa Sharma <[email protected]>
AuthorDate: Fri Oct 17 21:52:48 2025 +0530

    [multistage] Add Config for Disabling Joins in MSE Lite Mode (#17030)
---
 .../MultiStageBrokerRequestHandler.java            |  4 +
 .../org/apache/pinot/query/QueryEnvironment.java   | 12 ++-
 .../query/context/PhysicalPlannerContext.java      | 12 ++-
 .../planner/physical/v2/PRelNodeTreeValidator.java | 34 +++++++-
 .../v2/validation/LiteModeJoinValidationTest.java  | 97 ++++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  9 ++
 6 files changed, 162 insertions(+), 6 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 9b40baa0c13..9b420d12a14 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -412,6 +412,9 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     int defaultLiteModeFanoutAdjustedLimit = _config.getProperty(
         
CommonConstants.Broker.CONFIG_OF_LITE_MODE_LEAF_STAGE_FANOUT_ADJUSTED_LIMIT,
         
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT);
+    boolean defaultLiteModeEnableJoins = _config.getProperty(
+        CommonConstants.Broker.CONFIG_OF_LITE_MODE_ENABLE_JOINS,
+        CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS);
     String defaultHashFunction = _config.getProperty(
         CommonConstants.Broker.CONFIG_OF_BROKER_DEFAULT_HASH_FUNCTION,
         CommonConstants.Broker.DEFAULT_BROKER_DEFAULT_HASH_FUNCTION);
@@ -437,6 +440,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         .defaultUseBrokerPruning(defaultUseBrokerPruning)
         .defaultLiteModeLeafStageLimit(defaultLiteModeLeafStageLimit)
         
.defaultLiteModeLeafStageFanOutAdjustedLimit(defaultLiteModeFanoutAdjustedLimit)
+        .defaultLiteModeEnableJoins(defaultLiteModeEnableJoins)
         .defaultHashFunction(defaultHashFunction)
         .build();
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
index 7b5524f3468..5f881322f72 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java
@@ -207,7 +207,7 @@ public class QueryEnvironment {
           workerManager.getInstanceId(), sqlNodeAndOptions.getOptions(),
           _envConfig.defaultUseLiteMode(), _envConfig.defaultRunInBroker(), 
_envConfig.defaultUseBrokerPruning(),
           _envConfig.defaultLiteModeLeafStageLimit(), 
_envConfig.defaultHashFunction(),
-          _envConfig.defaultLiteModeLeafStageFanOutAdjustedLimit());
+          _envConfig.defaultLiteModeLeafStageFanOutAdjustedLimit(), 
_envConfig.defaultLiteModeEnableJoins());
     }
     return new PlannerContext(_config, _catalogReader, _typeFactory, 
optProgram, traitProgram,
         sqlNodeAndOptions.getOptions(), _envConfig, format, 
physicalPlannerContext);
@@ -365,7 +365,7 @@ public class QueryEnvironment {
       Preconditions.checkNotNull(plannerContext.getPhysicalPlannerContext(), 
"Physical planner context is null");
       optimized = RelToPRelConverter.toPRelNode(optimized, 
plannerContext.getPhysicalPlannerContext(),
           _envConfig.getTableCache()).unwrap();
-      PRelNodeTreeValidator.validate((PRelNode) optimized);
+      PRelNodeTreeValidator.validate((PRelNode) optimized, 
plannerContext.getPhysicalPlannerContext());
     }
     return relation.withRel(optimized);
   }
@@ -795,6 +795,14 @@ public class QueryEnvironment {
       return CommonConstants.Broker.DEFAULT_BROKER_DEFAULT_HASH_FUNCTION;
     }
 
+      /**
+       * Whether to enable joins when using MSE Lite mode.
+       */
+      @Value.Default
+      default boolean defaultLiteModeEnableJoins() {
+        return CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS;
+      }
+
     /**
      * Returns the worker manager.
      *
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
index 49d433c3cee..3a7fafe9b53 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PhysicalPlannerContext.java
@@ -70,6 +70,7 @@ public class PhysicalPlannerContext {
   private final int _liteModeLeafStageLimit;
   private final int _liteModeLeafStageFanOutAdjustedLimit;
   private final DistHashFunction _defaultHashFunction;
+  private final boolean _liteModeJoinsEnabled;
 
   /**
    * Used by controller when it needs to extract table names from the query.
@@ -88,6 +89,7 @@ public class PhysicalPlannerContext {
     _liteModeLeafStageLimit = 
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT;
     _liteModeLeafStageFanOutAdjustedLimit = 
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT;
     _defaultHashFunction = 
DistHashFunction.valueOf(KeySelector.DEFAULT_HASH_ALGORITHM.toUpperCase());
+    _liteModeJoinsEnabled = 
CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS;
   }
 
   public PhysicalPlannerContext(RoutingManager routingManager, String 
hostName, int port, long requestId,
@@ -95,13 +97,14 @@ public class PhysicalPlannerContext {
     this(routingManager, hostName, port, requestId, instanceId, queryOptions,
         CommonConstants.Broker.DEFAULT_USE_LITE_MODE, 
CommonConstants.Broker.DEFAULT_RUN_IN_BROKER,
         CommonConstants.Broker.DEFAULT_USE_BROKER_PRUNING, 
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT,
-        KeySelector.DEFAULT_HASH_ALGORITHM, 
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT);
+        KeySelector.DEFAULT_HASH_ALGORITHM, 
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT,
+        CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS);
   }
 
   public PhysicalPlannerContext(RoutingManager routingManager, String 
hostName, int port, long requestId,
       String instanceId, Map<String, String> queryOptions, boolean 
defaultUseLiteMode, boolean defaultRunInBroker,
       boolean defaultUseBrokerPruning, int defaultLiteModeLeafStageLimit, 
String defaultHashFunction,
-      int defaultLiteModeLeafStageFanOutAdjustedLimit) {
+      int defaultLiteModeLeafStageFanOutAdjustedLimit, boolean 
defaultLiteModeEnableJoins) {
     _routingManager = routingManager;
     _hostName = hostName;
     _port = port;
@@ -117,6 +120,7 @@ public class PhysicalPlannerContext {
         defaultLiteModeLeafStageFanOutAdjustedLimit);
     _defaultHashFunction = 
DistHashFunction.valueOf(defaultHashFunction.toUpperCase());
     _instanceIdToQueryServerInstance.put(instanceId, 
getBrokerQueryServerInstance());
+    _liteModeJoinsEnabled = defaultLiteModeEnableJoins;
   }
 
   public Supplier<Integer> getNodeIdGenerator() {
@@ -156,6 +160,10 @@ public class PhysicalPlannerContext {
     return _useLiteMode;
   }
 
+  public boolean isLiteModeJoinsEnabled() {
+    return _liteModeJoinsEnabled;
+  }
+
   public boolean isRunInBroker() {
     return _runInBroker;
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java
index b103696165a..d10c1790a72 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PRelNodeTreeValidator.java
@@ -22,6 +22,8 @@ import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.Window;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.query.context.PhysicalPlannerContext;
+import org.apache.pinot.spi.exception.QueryErrorCode;
 
 
 /**
@@ -37,8 +39,36 @@ public class PRelNodeTreeValidator {
    * Validate the tree rooted at the given PRelNode. Ideally all issues with 
the plan should be caught even with an
    * EXPLAIN, hence this method should be called as part of query compilation 
itself.
    */
-  public static void validate(PRelNode rootNode) {
-    // TODO(mse-physical): Add plan validations here.
+  public static void validate(PRelNode rootNode, PhysicalPlannerContext 
context) {
+    if (context == null) {
+      return;
+    }
+    validateLiteModeJoins(rootNode, context);
+  }
+
+  /**
+   * Disables joins in lite mode based on broker-initialized config carried in 
context.
+   */
+  private static void validateLiteModeJoins(PRelNode rootNode, 
PhysicalPlannerContext context) {
+    if (!context.isUseLiteMode() || context.isLiteModeJoinsEnabled()) {
+      return;
+    }
+    // If joins are disabled in lite mode, we need to check if the plan 
contains joins
+    if (containsJoin(rootNode)) {
+      throw QueryErrorCode.QUERY_PLANNING.asException("Joins are disabled in 
lite mode");
+    }
+  }
+
+  private static boolean containsJoin(PRelNode node) {
+    if (node.unwrap() instanceof Join) {
+      return true;
+    }
+    for (PRelNode input : node.getPRelInputs()) {
+      if (containsJoin(input)) {
+        return true;
+      }
+    }
+    return false;
   }
 
   /**
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
new file mode 100644
index 00000000000..698413c39fe
--- /dev/null
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/v2/validation/LiteModeJoinValidationTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2.validation;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Join;
+import org.apache.pinot.query.context.PhysicalPlannerContext;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.physical.v2.PRelNode;
+import org.apache.pinot.query.planner.physical.v2.PRelNodeTreeValidator;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class LiteModeJoinValidationTest {
+
+  private static PhysicalPlannerContext buildContext(boolean 
liteModeJoinsEnabled, boolean useLiteMode) {
+    return new PhysicalPlannerContext(
+        null,
+        "localhost",
+        8000,
+        1L,
+        "Broker_localhost",
+        Map.of(),
+        useLiteMode,
+        false,
+        false,
+        CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_LIMIT,
+        KeySelector.DEFAULT_HASH_ALGORITHM,
+        
CommonConstants.Broker.DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT,
+        liteModeJoinsEnabled);
+  }
+
+  private static PRelNode makeJoinPlan() {
+    PRelNode node = Mockito.mock(PRelNode.class);
+    Join join = Mockito.mock(Join.class);
+    Mockito.doReturn(join).when(node).unwrap();
+    Mockito.doReturn(List.of()).when(node).getPRelInputs();
+    return node;
+  }
+
+  private static PRelNode makeNoJoinPlan() {
+    PRelNode node = Mockito.mock(PRelNode.class);
+    RelNode rel = Mockito.mock(RelNode.class);
+    Mockito.doReturn(rel).when(node).unwrap();
+    Mockito.doReturn(List.of()).when(node).getPRelInputs();
+    return node;
+  }
+
+  @Test
+  public void testJoinEnabledByDefaultInLiteMode() {
+    PhysicalPlannerContext ctx = 
buildContext(CommonConstants.Broker.DEFAULT_LITE_MODE_ENABLE_JOINS, true);
+    PRelNode plan = makeJoinPlan();
+    PRelNodeTreeValidator.validate(plan, ctx);
+  }
+
+  @Test
+  public void testJoinNotAllowedWhenEnabledInLiteMode() {
+    PhysicalPlannerContext ctx = buildContext(false, true);
+    PRelNode plan = makeJoinPlan();
+    try {
+      PRelNodeTreeValidator.validate(plan, ctx);
+      Assert.fail("Expected QUERY_PLANNING error due to joins disabled in lite 
mode");
+    } catch (QueryException qe) {
+      Assert.assertEquals(qe.getErrorCode(), QueryErrorCode.QUERY_PLANNING);
+      Assert.assertTrue(qe.getMessage().contains("Joins are disabled in lite 
mode"), qe.getMessage());
+    }
+  }
+
+  @Test
+  public void testNoJoinPassesEvenWhenDisabled() {
+    PhysicalPlannerContext ctx = buildContext(false, true);
+    PRelNode plan = makeNoJoinPlan();
+    PRelNodeTreeValidator.validate(plan, ctx);
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index ab3ec338840..8769b4d43d4 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -571,6 +571,15 @@ public class CommonConstants {
         "pinot.broker.multistage.lite.mode.leaf.stage.fanOutAdjustedLimit";
     public static final int 
DEFAULT_LITE_MODE_LEAF_STAGE_FAN_OUT_ADJUSTED_LIMIT = -1;
 
+    /**
+     * Whether to enable JOIN queries when MSE Lite mode is enabled. By 
default joins are enabled
+     * in lite mode unless explicitly disabled. This value cannot be 
overridden by query option.
+     */
+    public static final String CONFIG_OF_LITE_MODE_ENABLE_JOINS =
+        "pinot.broker.multistage.lite.mode.enable.joins";
+    public static final boolean DEFAULT_LITE_MODE_ENABLE_JOINS = true;
+
+
     // Config for default hash function used in KeySelector for data shuffling
     public static final String CONFIG_OF_BROKER_DEFAULT_HASH_FUNCTION = 
"pinot.broker.multistage.default.hash.function";
     public static final String DEFAULT_BROKER_DEFAULT_HASH_FUNCTION = 
"absHashCode";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to