This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d312e5a3f [Multi-stage] Support partition based leaf stage processing 
(#11234)
5d312e5a3f is described below

commit 5d312e5a3f9e7c6edf152e60506c3c7a222c75d2
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Aug 2 10:03:49 2023 -0700

    [Multi-stage] Support partition based leaf stage processing (#11234)
---
 .../apache/calcite/rel/hint/PinotHintOptions.java  |   1 -
 .../rel/rules/PinotJoinExchangeNodeInsertRule.java |  11 +-
 .../rel/rules/PinotJoinToDynamicBroadcastRule.java |  15 +-
 .../planner/logical/RelToPlanNodeConverter.java    |  11 +-
 .../planner/physical/DispatchablePlanMetadata.java |  28 ++-
 .../planner/physical/DispatchablePlanVisitor.java  |   2 +
 .../planner/physical/MailboxAssignmentVisitor.java |  32 ++-
 .../pinot/query/planner/plannode/JoinNode.java     |   9 +-
 .../apache/pinot/query/routing/WorkerManager.java  | 275 ++++++++++-----------
 .../pinot/query/QueryEnvironmentTestBase.java      |  44 +++-
 .../query/testutils/MockRoutingManagerFactory.java |  96 +++----
 .../test/resources/queries/PinotHintablePlans.json |  53 +---
 .../pinot/query/runtime/QueryRunnerTest.java       |  47 ++--
 .../runtime/operator/HashJoinOperatorTest.java     |  32 +--
 .../plan/pipeline/PipelineBreakerExecutorTest.java |   6 +-
 .../runtime/queries/ResourceBasedQueriesTest.java  |  80 +++---
 .../service/dispatch/QueryDispatcherTest.java      |   2 +-
 .../query/service/server/QueryServerTest.java      |   2 +-
 .../testutils/MockInstanceDataManagerFactory.java  |  70 ++++--
 .../src/test/resources/queries/Aggregates.json     |   4 +-
 .../src/test/resources/queries/CountDistinct.json  |  12 +-
 .../src/test/resources/queries/QueryHints.json     |  34 ++-
 .../src/test/resources/queries/SelectHaving.json   |   4 +-
 23 files changed, 454 insertions(+), 416 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
index 1c613074cf..9ffeaf8f8c 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/hint/PinotHintOptions.java
@@ -65,7 +65,6 @@ public class PinotHintOptions {
 
   public static class JoinHintOptions {
     public static final String JOIN_STRATEGY = "join_strategy";
-    public static final String IS_COLOCATED_BY_JOIN_KEYS = 
"is_colocated_by_join_keys";
   }
 
   public static class TableHintOptions {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
index 465aa8b12d..a4b64b9800 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinExchangeNodeInsertRule.java
@@ -25,8 +25,6 @@ import org.apache.calcite.rel.RelDistributions;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.hint.PinotHintOptions;
-import org.apache.calcite.rel.hint.PinotHintStrategyTable;
 import org.apache.calcite.rel.logical.LogicalJoin;
 import org.apache.calcite.rel.logical.PinotLogicalExchange;
 import org.apache.calcite.tools.RelBuilderFactory;
@@ -65,14 +63,7 @@ public class PinotJoinExchangeNodeInsertRule extends 
RelOptRule {
     RelNode rightExchange;
     JoinInfo joinInfo = join.analyzeCondition();
 
-    boolean isColocatedJoin =
-        PinotHintStrategyTable.isHintOptionTrue(join.getHints(), 
PinotHintOptions.JOIN_HINT_OPTIONS,
-            PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
-    if (isColocatedJoin) {
-      // join exchange are colocated, we should directly pass through via join 
key
-      leftExchange = PinotLogicalExchange.create(leftInput, 
RelDistributions.SINGLETON);
-      rightExchange = PinotLogicalExchange.create(rightInput, 
RelDistributions.SINGLETON);
-    } else if (joinInfo.leftKeys.isEmpty()) {
+    if (joinInfo.leftKeys.isEmpty()) {
       // when there's no JOIN key, use broadcast.
       leftExchange = PinotLogicalExchange.create(leftInput, 
RelDistributions.RANDOM_DISTRIBUTED);
       rightExchange = PinotLogicalExchange.create(rightInput, 
RelDistributions.BROADCAST_DISTRIBUTED);
diff --git 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
index b7b2f7f1a6..0c17a32d05 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/calcite/rel/rules/PinotJoinToDynamicBroadcastRule.java
@@ -155,23 +155,16 @@ public class PinotJoinToDynamicBroadcastRule extends 
RelOptRule {
     PinotLogicalExchange right = (PinotLogicalExchange) (join.getRight() 
instanceof HepRelVertex
         ? ((HepRelVertex) join.getRight()).getCurrentRel() : join.getRight());
 
-    boolean isColocatedJoin = 
PinotHintStrategyTable.isHintOptionTrue(join.getHints(),
-        PinotHintOptions.JOIN_HINT_OPTIONS, 
PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
-    PinotLogicalExchange dynamicBroadcastExchange = isColocatedJoin
-        ? PinotLogicalExchange.create(right.getInput(), 
RelDistributions.SINGLETON,
-            PinotRelExchangeType.PIPELINE_BREAKER)
-        : PinotLogicalExchange.create(right.getInput(), 
RelDistributions.BROADCAST_DISTRIBUTED,
+    PinotLogicalExchange dynamicBroadcastExchange =
+        PinotLogicalExchange.create(right.getInput(), 
RelDistributions.BROADCAST_DISTRIBUTED,
             PinotRelExchangeType.PIPELINE_BREAKER);
     Join dynamicFilterJoin =
         new LogicalJoin(join.getCluster(), join.getTraitSet(), 
left.getInput(), dynamicBroadcastExchange,
             join.getCondition(), join.getVariablesSet(), join.getJoinType(), 
join.isSemiJoinDone(),
             ImmutableList.copyOf(join.getSystemFieldList()));
     // adding pass-through exchange after join b/c currently leaf-stage 
doesn't support chaining operator(s) after JOIN
-    // TODO: support pass-through for singleton again when non-colocated.
-    // TODO: this is b/c #10886 alters the singleton exchange and it no longer 
works if join is not colocated.
-    PinotLogicalExchange passThroughAfterJoinExchange = isColocatedJoin
-        ? PinotLogicalExchange.create(dynamicFilterJoin, 
RelDistributions.SINGLETON)
-        : PinotLogicalExchange.create(dynamicFilterJoin, 
RelDistributions.hash(join.analyzeCondition().leftKeys));
+    PinotLogicalExchange passThroughAfterJoinExchange =
+        PinotLogicalExchange.create(dynamicFilterJoin, 
RelDistributions.hash(join.analyzeCondition().leftKeys));
     call.transformTo(passThroughAfterJoinExchange);
   }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
index d5097e7ed7..a4e6be355a 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java
@@ -31,8 +31,6 @@ import org.apache.calcite.rel.core.JoinInfo;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.core.SortExchange;
-import org.apache.calcite.rel.hint.PinotHintOptions;
-import org.apache.calcite.rel.hint.PinotHintStrategyTable;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalFilter;
 import org.apache.calcite.rel.logical.LogicalJoin;
@@ -134,8 +132,8 @@ public final class RelToPlanNodeConverter {
     // Compute all the tables involved under this exchange node
     Set<String> tableNames = getTableNamesFromRelRoot(node);
 
-    return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), 
exchangeType,
-        tableNames, node.getDistribution(), fieldCollations, isSortOnSender, 
isSortOnReceiver);
+    return new ExchangeNode(currentStageId, toDataSchema(node.getRowType()), 
exchangeType, tableNames,
+        node.getDistribution(), fieldCollations, isSortOnSender, 
isSortOnReceiver);
   }
 
   private static PlanNode convertLogicalSetOp(SetOp node, int currentStageId) {
@@ -187,11 +185,8 @@ public final class RelToPlanNodeConverter {
         new FieldSelectionKeySelector(joinInfo.rightKeys));
     List<RexExpression> joinClause =
         
joinInfo.nonEquiConditions.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
-    boolean isColocatedJoin =
-        PinotHintStrategyTable.isHintOptionTrue(node.getHints(), 
PinotHintOptions.JOIN_HINT_OPTIONS,
-            PinotHintOptions.JoinHintOptions.IS_COLOCATED_BY_JOIN_KEYS);
     return new JoinNode(currentStageId, toDataSchema(node.getRowType()), 
toDataSchema(node.getLeft().getRowType()),
-        toDataSchema(node.getRight().getRowType()), joinType, joinKeys, 
joinClause, isColocatedJoin);
+        toDataSchema(node.getRight().getRowType()), joinType, joinKeys, 
joinClause);
   }
 
   private static DataSchema toDataSchema(RelDataType rowType) {
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
index 7e0e1cdab1..f53cc008f8 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanMetadata.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.routing.MailboxMetadata;
 import org.apache.pinot.query.routing.QueryServerInstance;
@@ -41,7 +42,9 @@ import org.apache.pinot.query.routing.QueryServerInstance;
  * </ul>
  */
 public class DispatchablePlanMetadata implements Serializable {
+  // These 2 fields are extracted from TableScanNode
   private final List<String> _scannedTables;
+  private Map<String, String> _tableOptions;
 
   // used for assigning server/worker nodes.
   private Map<QueryServerInstance, List<Integer>> _serverInstanceToWorkerIdMap;
@@ -64,6 +67,9 @@ public class DispatchablePlanMetadata implements Serializable 
{
   // whether a stage requires singleton instance to execute, e.g. stage 
contains global reduce (sort/agg) operator.
   private boolean _requiresSingletonInstance;
 
+  // whether a stage is partitioned table scan
+  private boolean _isPartitionedTableScan;
+
   // Total worker count of this stage.
   private int _totalWorkerCount;
 
@@ -72,8 +78,6 @@ public class DispatchablePlanMetadata implements Serializable 
{
     _serverInstanceToWorkerIdMap = new HashMap<>();
     _workerIdToSegmentsMap = new HashMap<>();
     _workerIdToMailboxesMap = new HashMap<>();
-    _timeBoundaryInfo = null;
-    _requiresSingletonInstance = false;
     _tableToUnavailableSegmentsMap = new HashMap<>();
   }
 
@@ -85,6 +89,15 @@ public class DispatchablePlanMetadata implements 
Serializable {
     _scannedTables.add(tableName);
   }
 
+  @Nullable
+  public Map<String, String> getTableOptions() {
+    return _tableOptions;
+  }
+
+  public void setTableOptions(Map<String, String> tableOptions) {
+    _tableOptions = tableOptions;
+  }
+
   // -----------------------------------------------
   // attached physical plan context.
   // -----------------------------------------------
@@ -93,8 +106,7 @@ public class DispatchablePlanMetadata implements 
Serializable {
     return _workerIdToSegmentsMap;
   }
 
-  public void setWorkerIdToSegmentsMap(
-      Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap) {
+  public void setWorkerIdToSegmentsMap(Map<Integer, Map<String, List<String>>> 
workerIdToSegmentsMap) {
     _workerIdToSegmentsMap = workerIdToSegmentsMap;
   }
 
@@ -135,6 +147,14 @@ public class DispatchablePlanMetadata implements 
Serializable {
     _requiresSingletonInstance = _requiresSingletonInstance || 
newRequireInstance;
   }
 
+  public boolean isPartitionedTableScan() {
+    return _isPartitionedTableScan;
+  }
+
+  public void setPartitionedTableScan(boolean isPartitionedTableScan) {
+    _isPartitionedTableScan = isPartitionedTableScan;
+  }
+
   public int getTotalWorkerCount() {
     return _totalWorkerCount;
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
index ccb4622b8b..d177a1d892 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.planner.physical;
 
+import org.apache.calcite.rel.hint.PinotHintOptions;
 import org.apache.pinot.query.planner.plannode.AggregateNode;
 import org.apache.pinot.query.planner.plannode.ExchangeNode;
 import org.apache.pinot.query.planner.plannode.FilterNode;
@@ -127,6 +128,7 @@ public class DispatchablePlanVisitor implements 
PlanNodeVisitor<Void, Dispatchab
   public Void visitTableScan(TableScanNode node, DispatchablePlanContext 
context) {
     DispatchablePlanMetadata dispatchablePlanMetadata = 
getOrCreateDispatchablePlanMetadata(node, context);
     dispatchablePlanMetadata.addScannedTable(node.getTableName());
+    
dispatchablePlanMetadata.setTableOptions(node.getNodeHint()._hintOptions.get(PinotHintOptions.TABLE_HINT_OPTIONS));
     return null;
   }
 
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 180f5a413a..bc6cb7c83e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -59,9 +59,39 @@ public class MailboxAssignmentVisitor extends 
DefaultPostOrderTraversalVisitor<V
             receiverMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>()).put(senderFragmentId, mailboxMetadata);
           }
         });
+      } else if (senderMetadata.isPartitionedTableScan()) {
+        // For partitioned table scan, send the data to the worker with the 
same worker id (not necessary the same
+        // instance)
+        // TODO: Support further split the single partition into multiple 
workers
+        senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
+          for (int workerId : senderWorkerIds) {
+            receiverWorkerIdsMap.forEach((receiverServerInstance, 
receiverWorkerIds) -> {
+              for (int receiverWorkerId : receiverWorkerIds) {
+                if (receiverWorkerId == workerId) {
+                  String mailboxId =
+                      MailboxIdUtils.toPlanMailboxId(senderFragmentId, 
workerId, receiverFragmentId, workerId);
+                  MailboxMetadata serderMailboxMetadata = new 
MailboxMetadata(Collections.singletonList(mailboxId),
+                      Collections.singletonList(new 
VirtualServerAddress(receiverServerInstance, workerId)),
+                      Collections.emptyMap());
+                  MailboxMetadata receiverMailboxMetadata = new 
MailboxMetadata(Collections.singletonList(mailboxId),
+                      Collections.singletonList(new 
VirtualServerAddress(senderServerInstance, workerId)),
+                      Collections.emptyMap());
+                  senderMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>())
+                      .put(receiverFragmentId, serderMailboxMetadata);
+                  receiverMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>())
+                      .put(senderFragmentId, receiverMailboxMetadata);
+                  break;
+                }
+              }
+            });
+          }
+        });
       } else {
         // For other exchange types, send the data to all the instances in the 
receiver fragment
-        // TODO: Add support for more exchange types
+        // TODO:
+        //   1. Add support for more exchange types
+        //   2. Keep the receiver worker id sequential in the 
senderMailboxMetadata so that the partitionId aligns with
+        //      the workerId. It is useful for JOIN query when only left table 
is partitioned.
         senderWorkerIdsMap.forEach((senderServerInstance, senderWorkerIds) -> {
           for (int senderWorkerId : senderWorkerIds) {
             Map<Integer, MailboxMetadata> senderMailboxMetadataMap =
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
index b0f576258c..6d089c6239 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/plannode/JoinNode.java
@@ -36,8 +36,6 @@ public class JoinNode extends AbstractPlanNode {
   @ProtoProperties
   private List<RexExpression> _joinClause;
   @ProtoProperties
-  private boolean _isColocatedJoin;
-  @ProtoProperties
   private List<String> _leftColumnNames;
   @ProtoProperties
   private List<String> _rightColumnNames;
@@ -47,14 +45,13 @@ public class JoinNode extends AbstractPlanNode {
   }
 
   public JoinNode(int planFragmentId, DataSchema dataSchema, DataSchema 
leftSchema, DataSchema rightSchema,
-      JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> 
joinClause, boolean isColocatedJoin) {
+      JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> 
joinClause) {
     super(planFragmentId, dataSchema);
     _leftColumnNames = Arrays.asList(leftSchema.getColumnNames());
     _rightColumnNames = Arrays.asList(rightSchema.getColumnNames());
     _joinRelType = joinRelType;
     _joinKeys = joinKeys;
     _joinClause = joinClause;
-    _isColocatedJoin = isColocatedJoin;
   }
 
   public JoinRelType getJoinRelType() {
@@ -69,10 +66,6 @@ public class JoinNode extends AbstractPlanNode {
     return _joinClause;
   }
 
-  public boolean isColocatedJoin() {
-    return _isColocatedJoin;
-  }
-
   public List<String> getLeftColumnNames() {
     return _leftColumnNames;
   }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 48506ecfab..5fdb67c7c8 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.calcite.rel.hint.PinotHintOptions;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
@@ -39,8 +40,7 @@ import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.PlanFragment;
 import org.apache.pinot.query.planner.physical.DispatchablePlanContext;
 import org.apache.pinot.query.planner.physical.DispatchablePlanMetadata;
-import org.apache.pinot.query.planner.plannode.JoinNode;
-import org.apache.pinot.query.planner.plannode.PlanNode;
+import org.apache.pinot.query.planner.plannode.TableScanNode;
 import org.apache.pinot.spi.config.table.TableType;
 import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -96,19 +96,42 @@ public class WorkerManager {
   }
 
   private void assignWorkersToLeafFragment(PlanFragment fragment, 
DispatchablePlanContext context) {
+    // NOTE: For pipeline breaker, leaf fragment can also have children
+    for (PlanFragment child : fragment.getChildren()) {
+      assignWorkersToNonRootFragment(child, context);
+    }
+
     DispatchablePlanMetadata metadata = 
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
-    // table scan stage, need to attach server as well as segment info for 
each physical table type.
-    List<String> scannedTables = metadata.getScannedTables();
-    String logicalTableName = scannedTables.get(0);
-    Map<String, RoutingTable> routingTableMap = 
getRoutingTable(logicalTableName, context.getRequestId());
-    if (routingTableMap.size() == 0) {
-      throw new IllegalArgumentException("Unable to find routing entries for 
table: " + logicalTableName);
+    Map<String, String> tableOptions = metadata.getTableOptions();
+    String partitionKey = null;
+    int numPartitions = 0;
+    if (tableOptions != null) {
+      partitionKey = 
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_KEY);
+      String partitionSize = 
tableOptions.get(PinotHintOptions.TableHintOptions.PARTITION_SIZE);
+      if (partitionSize != null) {
+        numPartitions = Integer.parseInt(partitionSize);
+      }
+    }
+    if (partitionKey == null) {
+      assignWorkersToNonPartitionedLeafFragment(metadata, context);
+    } else {
+      Preconditions.checkState(numPartitions > 0, "'%s' must be provided for 
partition key: %s",
+          PinotHintOptions.TableHintOptions.PARTITION_SIZE, partitionKey);
+      assignWorkersToPartitionedLeafFragment(metadata, context, partitionKey, 
numPartitions);
     }
+  }
+
+  private void 
assignWorkersToNonPartitionedLeafFragment(DispatchablePlanMetadata metadata,
+      DispatchablePlanContext context) {
+    String tableName = metadata.getScannedTables().get(0);
+    Map<String, RoutingTable> routingTableMap = getRoutingTable(tableName, 
context.getRequestId());
+    Preconditions.checkState(!routingTableMap.isEmpty(), "Unable to find 
routing entries for table: %s", tableName);
+
     // acquire time boundary info if it is a hybrid table.
     if (routingTableMap.size() > 1) {
       TimeBoundaryInfo timeBoundaryInfo = _routingManager.getTimeBoundaryInfo(
           TableNameBuilder.forType(TableType.OFFLINE)
-              
.tableNameWithType(TableNameBuilder.extractRawTableName(logicalTableName)));
+              
.tableNameWithType(TableNameBuilder.extractRawTableName(tableName)));
       if (timeBoundaryInfo != null) {
         metadata.setTimeBoundaryInfo(timeBoundaryInfo);
       } else {
@@ -133,7 +156,7 @@ public class WorkerManager {
 
       // attach unavailable segments to metadata
       if (!routingTable.getUnavailableSegments().isEmpty()) {
-        metadata.addTableToUnavailableSegmentsMap(logicalTableName, 
routingTable.getUnavailableSegments());
+        metadata.addTableToUnavailableSegmentsMap(tableName, 
routingTable.getUnavailableSegments());
       }
     }
     int globalIdx = 0;
@@ -148,22 +171,17 @@ public class WorkerManager {
     metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
     metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
     metadata.setTotalWorkerCount(globalIdx);
-
-    // NOTE: For pipeline breaker, leaf fragment can also have children
-    for (PlanFragment child : fragment.getChildren()) {
-      assignWorkersToNonRootFragment(child, context);
-    }
   }
 
   /**
-   * Acquire routing table for items listed in {@link 
org.apache.pinot.query.planner.plannode.TableScanNode}.
+   * Acquire routing table for items listed in {@link TableScanNode}.
    *
-   * @param logicalTableName it can either be a hybrid table name or a 
physical table name with table type.
+   * @param tableName table name with or without type suffix.
    * @return keyed-map from table type(s) to routing table(s).
    */
-  private Map<String, RoutingTable> getRoutingTable(String logicalTableName, 
long requestId) {
-    String rawTableName = 
TableNameBuilder.extractRawTableName(logicalTableName);
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(logicalTableName);
+  private Map<String, RoutingTable> getRoutingTable(String tableName, long 
requestId) {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     Map<String, RoutingTable> routingTableMap = new HashMap<>();
     RoutingTable routingTable;
     if (tableType == null) {
@@ -176,7 +194,7 @@ public class WorkerManager {
         routingTableMap.put(TableType.REALTIME.name(), routingTable);
       }
     } else {
-      routingTable = getRoutingTable(logicalTableName, tableType, requestId);
+      routingTable = getRoutingTable(tableName, tableType, requestId);
       if (routingTable != null) {
         routingTableMap.put(tableType.name(), routingTable);
       }
@@ -191,15 +209,60 @@ public class WorkerManager {
         CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
tableNameWithType), requestId);
   }
 
+  private void assignWorkersToPartitionedLeafFragment(DispatchablePlanMetadata 
metadata,
+      DispatchablePlanContext context, String partitionKey, int numPartitions) 
{
+    String tableName = metadata.getScannedTables().get(0);
+    ColocatedTableInfo colocatedTableInfo = getColocatedTableInfo(tableName, 
partitionKey, numPartitions);
+
+    // Pick one server per partition
+    // NOTE: Pick server based on the request id so that the same server is 
picked across different table scan when the
+    //       segments for the same partition is colocated
+    long indexToPick = context.getRequestId();
+    ColocatedPartitionInfo[] partitionInfoMap = 
colocatedTableInfo._partitionInfoMap;
+    int nextWorkerId = 0;
+    Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new 
HashMap<>();
+    Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new 
HashMap<>();
+    Map<String, ServerInstance> enabledServerInstanceMap = 
_routingManager.getEnabledServerInstanceMap();
+    for (int i = 0; i < numPartitions; i++) {
+      ColocatedPartitionInfo partitionInfo = partitionInfoMap[i];
+      // TODO: Currently we don't support the case when a partition doesn't 
contain any segment. The reason is that the
+      //       leaf stage won't be able to directly return empty response.
+      Preconditions.checkState(partitionInfo != null, "Failed to find any 
segment for table: %s, partition: %s",
+          tableName, i);
+      ServerInstance serverInstance =
+          pickEnabledServer(partitionInfo._fullyReplicatedServers, 
enabledServerInstanceMap, indexToPick++);
+      Preconditions.checkState(serverInstance != null,
+          "Failed to find enabled fully replicated server for table: %s, 
partition: %s in table: %s", tableName, i);
+      QueryServerInstance queryServerInstance = new 
QueryServerInstance(serverInstance);
+      int workerId = nextWorkerId++;
+      serverInstanceToWorkerIdMap.computeIfAbsent(queryServerInstance, k -> 
new ArrayList<>()).add(workerId);
+      workerIdToSegmentsMap.put(workerId, getSegmentsMap(partitionInfo));
+    }
+
+    metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
+    metadata.setWorkerIdToSegmentsMap(workerIdToSegmentsMap);
+    metadata.setTotalWorkerCount(nextWorkerId);
+    metadata.setTimeBoundaryInfo(colocatedTableInfo._timeBoundaryInfo);
+    metadata.setPartitionedTableScan(true);
+  }
+
   private void assignWorkersToIntermediateFragment(PlanFragment fragment, 
DispatchablePlanContext context) {
-    if (isColocatedJoin(fragment.getFragmentRoot())) {
-      // TODO: Make it more general so that it can be used for other 
partitioned cases (e.g. group-by, window function)
-      try {
-        assignWorkersForColocatedJoin(fragment, context);
+    List<PlanFragment> children = fragment.getChildren();
+    for (PlanFragment child : children) {
+      assignWorkersToNonRootFragment(child, context);
+    }
+
+    Map<Integer, DispatchablePlanMetadata> metadataMap = 
context.getDispatchablePlanMetadataMap();
+    DispatchablePlanMetadata metadata = 
metadataMap.get(fragment.getFragmentId());
+
+    // If the first child is partitioned table scan, use the same worker 
assignment to avoid shuffling data
+    // TODO: Introduce a hint to control this
+    if (children.size() > 0) {
+      DispatchablePlanMetadata firstChildMetadata = 
metadataMap.get(children.get(0).getFragmentId());
+      if (firstChildMetadata.isPartitionedTableScan()) {
+        
metadata.setServerInstanceToWorkerIdMap(firstChildMetadata.getServerInstanceToWorkerIdMap());
+        metadata.setTotalWorkerCount(firstChildMetadata.getTotalWorkerCount());
         return;
-      } catch (Exception e) {
-        LOGGER.warn("[RequestId: {}] Caught exception while assigning workers 
for colocated join, "
-            + "falling back to regular worker assignment", 
context.getRequestId(), e);
       }
     }
 
@@ -224,7 +287,6 @@ public class WorkerManager {
       throw new IllegalStateException(
           "No server instance found for intermediate stage for tables: " + 
Arrays.toString(tableNames.toArray()));
     }
-    DispatchablePlanMetadata metadata = 
context.getDispatchablePlanMetadataMap().get(fragment.getFragmentId());
     Map<String, String> options = context.getPlannerContext().getOptions();
     int stageParallelism = 
Integer.parseInt(options.getOrDefault(QueryOptionKey.STAGE_PARALLELISM, "1"));
     if (metadata.isRequiresSingletonInstance()) {
@@ -246,101 +308,9 @@ public class WorkerManager {
       metadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
       metadata.setTotalWorkerCount(nextWorkerId);
     }
-
-    for (PlanFragment child : fragment.getChildren()) {
-      assignWorkersToNonRootFragment(child, context);
-    }
-  }
-
-  private boolean isColocatedJoin(PlanNode planNode) {
-    if (planNode instanceof JoinNode) {
-      return ((JoinNode) planNode).isColocatedJoin();
-    }
-    for (PlanNode child : planNode.getInputs()) {
-      if (isColocatedJoin(child)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private void assignWorkersForColocatedJoin(PlanFragment fragment, 
DispatchablePlanContext context) {
-    List<PlanFragment> children = fragment.getChildren();
-    Preconditions.checkArgument(children.size() == 2, "Expecting 2 children, 
find: %s", children.size());
-    PlanFragment leftFragment = children.get(0);
-    PlanFragment rightFragment = children.get(1);
-    Map<Integer, DispatchablePlanMetadata> metadataMap = 
context.getDispatchablePlanMetadataMap();
-    // TODO: Support multi-level colocated join (more than 2 tables colocated)
-    DispatchablePlanMetadata leftMetadata = 
metadataMap.get(leftFragment.getFragmentId());
-    Preconditions.checkArgument(isLeafPlan(leftMetadata), "Left side is not 
leaf");
-    DispatchablePlanMetadata rightMetadata = 
metadataMap.get(rightFragment.getFragmentId());
-    Preconditions.checkArgument(isLeafPlan(rightMetadata), "Right side is not 
leaf");
-
-    String leftTable = leftMetadata.getScannedTables().get(0);
-    String rightTable = rightMetadata.getScannedTables().get(0);
-    ColocatedTableInfo leftColocatedTableInfo = 
getColocatedTableInfo(leftTable);
-    ColocatedTableInfo rightColocatedTableInfo = 
getColocatedTableInfo(rightTable);
-    ColocatedPartitionInfo[] leftPartitionInfoMap = 
leftColocatedTableInfo._partitionInfoMap;
-    ColocatedPartitionInfo[] rightPartitionInfoMap = 
rightColocatedTableInfo._partitionInfoMap;
-    // TODO: Support colocated join when both side have different number of 
partitions (e.g. left: 8, right: 16)
-    int numPartitions = leftPartitionInfoMap.length;
-    Preconditions.checkState(numPartitions == rightPartitionInfoMap.length,
-        "Got different number of partitions in left table: %s (%s) and right 
table: %s (%s)", leftTable, numPartitions,
-        rightTable, rightPartitionInfoMap.length);
-
-    // Pick one server per partition
-    int nextWorkerId = 0;
-    Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap = new 
HashMap<>();
-    Map<Integer, Map<String, List<String>>> leftWorkerIdToSegmentsMap = new 
HashMap<>();
-    Map<Integer, Map<String, List<String>>> rightWorkerIdToSegmentsMap = new 
HashMap<>();
-    Map<String, ServerInstance> enabledServerInstanceMap = 
_routingManager.getEnabledServerInstanceMap();
-    for (int i = 0; i < numPartitions; i++) {
-      ColocatedPartitionInfo leftPartitionInfo = leftPartitionInfoMap[i];
-      ColocatedPartitionInfo rightPartitionInfo = rightPartitionInfoMap[i];
-      if (leftPartitionInfo == null && rightPartitionInfo == null) {
-        continue;
-      }
-      // TODO: Currently we don't support the case when for a partition only 
one side has segments. The reason is that
-      //       the leaf stage won't be able to directly return empty response.
-      Preconditions.checkState(leftPartitionInfo != null && rightPartitionInfo 
!= null,
-          "One side doesn't have any segment for partition: %s", i);
-      Set<String> candidates = new 
HashSet<>(leftPartitionInfo._fullyReplicatedServers);
-      candidates.retainAll(rightPartitionInfo._fullyReplicatedServers);
-      ServerInstance serverInstance = pickRandomEnabledServer(candidates, 
enabledServerInstanceMap);
-      Preconditions.checkState(serverInstance != null,
-          "Failed to find enabled fully replicated server for partition: %s in 
table: %s and %s", i, leftTable,
-          rightTable);
-      QueryServerInstance queryServerInstance = new 
QueryServerInstance(serverInstance);
-      int workerId = nextWorkerId++;
-      serverInstanceToWorkerIdMap.computeIfAbsent(queryServerInstance, k -> 
new ArrayList<>()).add(workerId);
-      leftWorkerIdToSegmentsMap.put(workerId, 
getSegmentsMap(leftPartitionInfo));
-      rightWorkerIdToSegmentsMap.put(workerId, 
getSegmentsMap(rightPartitionInfo));
-    }
-
-    DispatchablePlanMetadata joinMetadata = 
metadataMap.get(fragment.getFragmentId());
-    joinMetadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
-    joinMetadata.setTotalWorkerCount(nextWorkerId);
-
-    leftMetadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
-    leftMetadata.setWorkerIdToSegmentsMap(leftWorkerIdToSegmentsMap);
-    leftMetadata.setTotalWorkerCount(nextWorkerId);
-    leftMetadata.setTimeBoundaryInfo(leftColocatedTableInfo._timeBoundaryInfo);
-
-    rightMetadata.setServerInstanceToWorkerIdMap(serverInstanceToWorkerIdMap);
-    rightMetadata.setWorkerIdToSegmentsMap(rightWorkerIdToSegmentsMap);
-    rightMetadata.setTotalWorkerCount(nextWorkerId);
-    
rightMetadata.setTimeBoundaryInfo(rightColocatedTableInfo._timeBoundaryInfo);
-
-    // NOTE: For pipeline breaker, leaf fragment can also have children
-    for (PlanFragment child : leftFragment.getChildren()) {
-      assignWorkersToNonRootFragment(child, context);
-    }
-    for (PlanFragment child : rightFragment.getChildren()) {
-      assignWorkersToNonRootFragment(child, context);
-    }
   }
 
-  private ColocatedTableInfo getColocatedTableInfo(String tableName) {
+  private ColocatedTableInfo getColocatedTableInfo(String tableName, String 
partitionKey, int numPartitions) {
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     if (tableType == null) {
       String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
@@ -354,14 +324,12 @@ public class WorkerManager {
         TimeBoundaryInfo timeBoundaryInfo = 
_routingManager.getTimeBoundaryInfo(offlineTableName);
         // Ignore OFFLINE side when time boundary info is unavailable
         if (timeBoundaryInfo == null) {
-          return getRealtimeColocatedTableInfo(realtimeTableName);
+          return getRealtimeColocatedTableInfo(realtimeTableName, 
partitionKey, numPartitions);
         }
-        PartitionInfo[] offlinePartitionInfoMap = 
getTablePartitionInfo(offlineTableName).getPartitionInfoMap();
-        PartitionInfo[] realtimePartitionInfoMap = 
getTablePartitionInfo(realtimeTableName).getPartitionInfoMap();
-        int numPartitions = offlinePartitionInfoMap.length;
-        Preconditions.checkState(numPartitions == 
realtimePartitionInfoMap.length,
-            "Got different number of partitions in OFFLINE side: %s and 
REALTIME side: %s of table: %s", numPartitions,
-            realtimePartitionInfoMap.length, tableName);
+        PartitionInfo[] offlinePartitionInfoMap =
+            getTablePartitionInfo(offlineTableName, partitionKey, 
numPartitions).getPartitionInfoMap();
+        PartitionInfo[] realtimePartitionInfoMap =
+            getTablePartitionInfo(realtimeTableName, partitionKey, 
numPartitions).getPartitionInfoMap();
         ColocatedPartitionInfo[] colocatedPartitionInfoMap = new 
ColocatedPartitionInfo[numPartitions];
         for (int i = 0; i < numPartitions; i++) {
           PartitionInfo offlinePartitionInfo = offlinePartitionInfoMap[i];
@@ -391,32 +359,39 @@ public class WorkerManager {
         }
         return new ColocatedTableInfo(colocatedPartitionInfoMap, 
timeBoundaryInfo);
       } else if (offlineRoutingExists) {
-        return getOfflineColocatedTableInfo(offlineTableName);
+        return getOfflineColocatedTableInfo(offlineTableName, partitionKey, 
numPartitions);
       } else {
-        return getRealtimeColocatedTableInfo(realtimeTableName);
+        return getRealtimeColocatedTableInfo(realtimeTableName, partitionKey, 
numPartitions);
       }
     } else {
       if (tableType == TableType.OFFLINE) {
-        return getOfflineColocatedTableInfo(tableName);
+        return getOfflineColocatedTableInfo(tableName, partitionKey, 
numPartitions);
       } else {
-        return getRealtimeColocatedTableInfo(tableName);
+        return getRealtimeColocatedTableInfo(tableName, partitionKey, 
numPartitions);
       }
     }
   }
 
-  private TablePartitionInfo getTablePartitionInfo(String tableNameWithType) {
+  private TablePartitionInfo getTablePartitionInfo(String tableNameWithType, 
String partitionKey, int numPartitions) {
     TablePartitionInfo tablePartitionInfo = 
_routingManager.getTablePartitionInfo(tableNameWithType);
     Preconditions.checkState(tablePartitionInfo != null, "Failed to find table 
partition info for table: %s",
         tableNameWithType);
+    
Preconditions.checkState(tablePartitionInfo.getPartitionColumn().equals(partitionKey),
+        "Partition key: %s does not match partition column: %s for table: %s", 
partitionKey,
+        tablePartitionInfo.getPartitionColumn(), tableNameWithType);
+    Preconditions.checkState(tablePartitionInfo.getNumPartitions() == 
numPartitions,
+        "Partition size mismatch (hint: %s, table: %s) for table: %s", 
numPartitions,
+        tablePartitionInfo.getNumPartitions(), tableNameWithType);
     
Preconditions.checkState(tablePartitionInfo.getSegmentsWithInvalidPartition().isEmpty(),
         "Find %s segments with invalid partition for table: %s",
         tablePartitionInfo.getSegmentsWithInvalidPartition().size(), 
tableNameWithType);
     return tablePartitionInfo;
   }
 
-  private ColocatedTableInfo getOfflineColocatedTableInfo(String 
offlineTableName) {
-    PartitionInfo[] partitionInfoMap = 
getTablePartitionInfo(offlineTableName).getPartitionInfoMap();
-    int numPartitions = partitionInfoMap.length;
+  private ColocatedTableInfo getOfflineColocatedTableInfo(String 
offlineTableName, String partitionKey,
+      int numPartitions) {
+    PartitionInfo[] partitionInfoMap =
+        getTablePartitionInfo(offlineTableName, partitionKey, 
numPartitions).getPartitionInfoMap();
     ColocatedPartitionInfo[] colocatedPartitionInfoMap = new 
ColocatedPartitionInfo[numPartitions];
     for (int i = 0; i < numPartitions; i++) {
       PartitionInfo partitionInfo = partitionInfoMap[i];
@@ -428,9 +403,10 @@ public class WorkerManager {
     return new ColocatedTableInfo(colocatedPartitionInfoMap, null);
   }
 
-  private ColocatedTableInfo getRealtimeColocatedTableInfo(String 
realtimeTableName) {
-    PartitionInfo[] partitionInfoMap = 
getTablePartitionInfo(realtimeTableName).getPartitionInfoMap();
-    int numPartitions = partitionInfoMap.length;
+  private ColocatedTableInfo getRealtimeColocatedTableInfo(String 
realtimeTableName, String partitionKey,
+      int numPartitions) {
+    PartitionInfo[] partitionInfoMap =
+        getTablePartitionInfo(realtimeTableName, partitionKey, 
numPartitions).getPartitionInfoMap();
     ColocatedPartitionInfo[] colocatedPartitionInfoMap = new 
ColocatedPartitionInfo[numPartitions];
     for (int i = 0; i < numPartitions; i++) {
       PartitionInfo partitionInfo = partitionInfoMap[i];
@@ -465,15 +441,26 @@ public class WorkerManager {
     }
   }
 
+  /**
+   * Picks an enabled server deterministically based on the given index to 
pick.
+   */
   @Nullable
-  private static ServerInstance pickRandomEnabledServer(Set<String> candidates,
-      Map<String, ServerInstance> enabledServerInstanceMap) {
-    if (candidates.isEmpty()) {
+  private static ServerInstance pickEnabledServer(Set<String> candidates,
+      Map<String, ServerInstance> enabledServerInstanceMap, long indexToPick) {
+    int numCandidates = candidates.size();
+    if (numCandidates == 0) {
       return null;
     }
+    if (numCandidates == 1) {
+      return enabledServerInstanceMap.get(candidates.iterator().next());
+    }
+    List<String> candidateList = new ArrayList<>(candidates);
+    candidateList.sort(null);
+    int startIndex = (int) ((indexToPick & Long.MAX_VALUE) % numCandidates);
     String[] servers = candidates.toArray(new String[0]);
     ArrayUtils.shuffle(servers, RANDOM);
-    for (String server : servers) {
+    for (int i = 0; i < numCandidates; i++) {
+      String server = candidateList.get((startIndex + i) % numCandidates);
       ServerInstance serverInstance = enabledServerInstanceMap.get(server);
       if (serverInstance != null) {
         return serverInstance;
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index c6308d0854..59aa44553d 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -22,13 +22,19 @@ import 
com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import javax.annotation.Nullable;
 import org.apache.calcite.jdbc.CalciteSchemaBuilder;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.core.routing.RoutingManager;
+import org.apache.pinot.core.routing.TablePartitionInfo;
+import org.apache.pinot.core.routing.TablePartitionInfo.PartitionInfo;
 import org.apache.pinot.query.catalog.PinotCatalog;
 import org.apache.pinot.query.routing.WorkerManager;
 import org.apache.pinot.query.testutils.MockRoutingManagerFactory;
@@ -72,7 +78,7 @@ public class QueryEnvironmentTestBase {
   @BeforeClass
   public void setUp() {
     // the port doesn't matter as we are not actually making a server call.
-    _queryEnvironment = getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS, 
SERVER1_SEGMENTS, SERVER2_SEGMENTS);
+    _queryEnvironment = getQueryEnvironment(3, 1, 2, TABLE_SCHEMAS, 
SERVER1_SEGMENTS, SERVER2_SEGMENTS, null);
   }
 
   @DataProvider(name = "testQueryDataProvider")
@@ -96,9 +102,8 @@ public class QueryEnvironmentTestBase {
         new Object[]{"SELECT SUM(a.col3), COUNT(*) FROM a WHERE a.col3 >= 0 
AND a.col2 = 'a'"},
         new Object[]{"SELECT AVG(a.col3), SUM(a.col3), COUNT(a.col3) FROM a"},
         new Object[]{"SELECT a.col1, AVG(a.col3), SUM(a.col3), COUNT(a.col3) 
FROM a GROUP BY a.col1"},
-        // TODO: support BOOL_AND and BOOL_OR as MIN/MAX
-//        new Object[]{"SELECT BOOL_AND(a.col5), BOOL_OR(a.col5) FROM a"},
-//        new Object[]{"SELECT a.col3, BOOL_AND(a.col5), BOOL_OR(a.col5) FROM 
a GROUP BY a.col3"},
+        new Object[]{"SELECT BOOL_AND(a.col5), BOOL_OR(a.col5) FROM a"},
+        new Object[]{"SELECT a.col3, BOOL_AND(a.col5), BOOL_OR(a.col5) FROM a 
GROUP BY a.col3"},
         new Object[]{"SELECT KURTOSIS(a.col2), COUNT(DISTINCT a.col3), 
SKEWNESS(a.col3) FROM a"},
         new Object[]{"SELECT a.col1, KURTOSIS(a.col2), SKEWNESS(a.col3) FROM a 
GROUP BY a.col1"},
         new Object[]{"SELECT COUNT(a.col3), AVG(a.col3), SUM(a.col3), 
MIN(a.col3), MAX(a.col3) FROM a"},
@@ -140,9 +145,8 @@ public class QueryEnvironmentTestBase {
         new Object[]{"SELECT RANK() OVER(PARTITION BY a.col2 ORDER BY a.col1) 
FROM a"},
         new Object[]{"SELECT DENSE_RANK() OVER(ORDER BY a.col1) FROM a"},
         new Object[]{"SELECT a.col1, SUM(a.col3) OVER (ORDER BY a.col2), 
MIN(a.col3) OVER (ORDER BY a.col2) FROM a"},
-        new Object[]{"SELECT /*+ 
joinOptions(is_colocated_by_join_keys='true'), "
-            + "aggOptions(is_partitioned_by_group_by_keys='true') */ a.col3, 
a.col1, SUM(b.col3) FROM a JOIN b "
-            + "ON a.col3 = b.col3 GROUP BY a.col3, a.col1"},
+        new Object[]{"SELECT /*+ 
aggOptions(is_partitioned_by_group_by_keys='true') */ a.col3, a.col1, 
SUM(b.col3) "
+            + "FROM a JOIN b ON a.col3 = b.col3 GROUP BY a.col3, a.col1"},
         new Object[]{"SELECT /*+ 
aggOptions(is_skip_leaf_stage_group_by='true') */ a.col2, COUNT(*), 
SUM(a.col3), "
             + "SUM(a.col1) FROM a WHERE a.col3 >= 0 AND a.col2 = 'a' GROUP BY 
a.col2 HAVING COUNT(*) > 10 "
             + "AND MAX(a.col3) >= 0 AND MIN(a.col3) < 20 AND SUM(a.col3) <= 10 
AND AVG(a.col3) = 5"},
@@ -165,7 +169,8 @@ public class QueryEnvironmentTestBase {
   }
 
   public static QueryEnvironment getQueryEnvironment(int reducerPort, int 
port1, int port2,
-      Map<String, Schema> schemaMap, Map<String, List<String>> segmentMap1, 
Map<String, List<String>> segmentMap2) {
+      Map<String, Schema> schemaMap, Map<String, List<String>> segmentMap1, 
Map<String, List<String>> segmentMap2,
+      @Nullable Map<String, Pair<String, List<List<String>>>> 
partitionedSegmentsMap) {
     MockRoutingManagerFactory factory = new MockRoutingManagerFactory(port1, 
port2);
     for (Map.Entry<String, Schema> entry : schemaMap.entrySet()) {
       factory.registerTable(entry.getValue(), entry.getKey());
@@ -180,7 +185,28 @@ public class QueryEnvironmentTestBase {
         factory.registerSegment(port2, entry.getKey(), segment);
       }
     }
-    RoutingManager routingManager = factory.buildRoutingManager();
+    Map<String, TablePartitionInfo> partitionInfoMap = null;
+    if (MapUtils.isNotEmpty(partitionedSegmentsMap)) {
+      partitionInfoMap = new HashMap<>();
+      for (Map.Entry<String, Pair<String, List<List<String>>>> entry : 
partitionedSegmentsMap.entrySet()) {
+        String tableNameWithType = entry.getKey();
+        String partitionColumn = entry.getValue().getLeft();
+        List<List<String>> partitionIdToSegmentsMap = 
entry.getValue().getRight();
+        int numPartitions = partitionIdToSegmentsMap.size();
+        String hostname1 = MockRoutingManagerFactory.toHostname(port1);
+        String hostname2 = MockRoutingManagerFactory.toHostname(port2);
+        PartitionInfo[] partitionIdToInfoMap = new 
PartitionInfo[numPartitions];
+        for (int i = 0; i < numPartitions; i++) {
+          String hostname = i < (numPartitions / 2) ? hostname1 : hostname2;
+          partitionIdToInfoMap[i] = new 
PartitionInfo(Collections.singleton(hostname), partitionIdToSegmentsMap.get(i));
+        }
+        TablePartitionInfo tablePartitionInfo =
+            new TablePartitionInfo(tableNameWithType, partitionColumn, 
"hashCode", numPartitions, partitionIdToInfoMap,
+                Collections.emptySet());
+        partitionInfoMap.put(tableNameWithType, tablePartitionInfo);
+      }
+    }
+    RoutingManager routingManager = 
factory.buildRoutingManager(partitionInfoMap);
     TableCache tableCache = factory.buildTableCache();
     return new QueryEnvironment(new TypeFactory(new TypeSystem()),
         CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
index 58db9c7f41..7442ade6e2 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -21,8 +21,10 @@ package org.apache.pinot.query.testutils;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.helix.model.InstanceConfig;
@@ -34,7 +36,6 @@ import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.TablePartitionInfo;
 import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -51,62 +52,52 @@ public class MockRoutingManagerFactory {
   private static final String TIME_BOUNDARY_COLUMN = "ts";
   private static final String HOST_NAME = "localhost";
 
-  private final HashMap<String, String> _tableNameMap;
+  private final Map<String, String> _tableNameMap;
   private final Map<String, Schema> _schemaMap;
-
+  private final Set<String> _hybridTables;
   private final Map<String, ServerInstance> _serverInstances;
-  private final Map<String, RoutingTable> _routingTableMap;
-  private final List<String> _hybridTables;
-
-  private final Map<String, Map<ServerInstance, List<String>>> 
_tableServerSegmentMap;
+  private final Map<String, Map<ServerInstance, List<String>>> 
_tableServerSegmentsMap;
 
   public MockRoutingManagerFactory(int... ports) {
-    _hybridTables = new ArrayList<>();
-    _serverInstances = new HashMap<>();
-    _schemaMap = new HashMap<>();
     _tableNameMap = new HashMap<>();
-    _routingTableMap = new HashMap<>();
-
-    _tableServerSegmentMap = new HashMap<>();
+    _schemaMap = new HashMap<>();
+    _hybridTables = new HashSet<>();
+    _serverInstances = new HashMap<>();
+    _tableServerSegmentsMap = new HashMap<>();
     for (int port : ports) {
       _serverInstances.put(toHostname(port), getServerInstance(HOST_NAME, 
port, port, port, port));
     }
   }
 
-  public MockRoutingManagerFactory registerTable(Schema schema, String 
tableName) {
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-    if (tableType == null) {
-      registerTableNameWithType(schema, 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
-      registerTableNameWithType(schema, 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
-      _hybridTables.add(tableName);
+  public void registerTable(Schema schema, String tableName) {
+    if (TableNameBuilder.isTableResource(tableName)) {
+      registerTableNameWithType(schema, tableName);
     } else {
-      registerTableNameWithType(schema, 
TableNameBuilder.forType(tableType).tableNameWithType(tableName));
+      registerTableNameWithType(schema, 
TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+      registerTableNameWithType(schema, 
TableNameBuilder.REALTIME.tableNameWithType(tableName));
+      _hybridTables.add(tableName);
     }
-    return this;
   }
 
-  public MockRoutingManagerFactory registerSegment(int insertToServerPort, 
String tableNameWithType,
-      String segmentName) {
-    Map<ServerInstance, List<String>> serverSegmentMap =
-        _tableServerSegmentMap.getOrDefault(tableNameWithType, new 
HashMap<>());
-    ServerInstance serverInstance = 
_serverInstances.get(toHostname(insertToServerPort));
+  private void registerTableNameWithType(Schema schema, String 
tableNameWithType) {
+    _tableNameMap.put(tableNameWithType, tableNameWithType);
+    _schemaMap.put(TableNameBuilder.extractRawTableName(tableNameWithType), 
schema);
+  }
 
-    List<String> sSegments = serverSegmentMap.getOrDefault(serverInstance, new 
ArrayList<>());
-    sSegments.add(segmentName);
-    serverSegmentMap.put(serverInstance, sSegments);
-    _tableServerSegmentMap.put(tableNameWithType, serverSegmentMap);
-    return this;
+  public void registerSegment(int insertToServerPort, String 
tableNameWithType, String segmentName) {
+    ServerInstance serverInstance = 
_serverInstances.get(toHostname(insertToServerPort));
+    _tableServerSegmentsMap.computeIfAbsent(tableNameWithType, k -> new 
HashMap<>())
+        .computeIfAbsent(serverInstance, k -> new 
ArrayList<>()).add(segmentName);
   }
 
-  public RoutingManager buildRoutingManager() {
-    // create all the fake routing tables
-    _routingTableMap.clear();
-    for (Map.Entry<String, Map<ServerInstance, List<String>>> tableEntry : 
_tableServerSegmentMap.entrySet()) {
+  public RoutingManager buildRoutingManager(@Nullable Map<String, 
TablePartitionInfo> partitionInfoMap) {
+    Map<String, RoutingTable> routingTableMap = new HashMap<>();
+    for (Map.Entry<String, Map<ServerInstance, List<String>>> tableEntry : 
_tableServerSegmentsMap.entrySet()) {
       String tableNameWithType = tableEntry.getKey();
       RoutingTable fakeRoutingTable = new RoutingTable(tableEntry.getValue(), 
Collections.emptyList(), 0);
-      _routingTableMap.put(tableNameWithType, fakeRoutingTable);
+      routingTableMap.put(tableNameWithType, fakeRoutingTable);
     }
-    return new FakeRoutingManager(_routingTableMap, _serverInstances, 
_hybridTables);
+    return new FakeRoutingManager(routingTableMap, _hybridTables, 
partitionInfoMap, _serverInstances);
   }
 
   public TableCache buildTableCache() {
@@ -119,7 +110,7 @@ public class MockRoutingManagerFactory {
     return mock;
   }
 
-  private static String toHostname(int port) {
+  public static String toHostname(int port) {
     return String.format("%s_%d", HOST_NAME, port);
   }
 
@@ -137,23 +128,18 @@ public class MockRoutingManagerFactory {
     return new ServerInstance(instanceConfig);
   }
 
-  private void registerTableNameWithType(Schema schema, String 
tableNameWithType) {
-    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-    _tableNameMap.put(tableNameWithType, rawTableName);
-    _schemaMap.put(rawTableName, schema);
-    _schemaMap.put(tableNameWithType, schema);
-  }
-
   private static class FakeRoutingManager implements RoutingManager {
     private final Map<String, RoutingTable> _routingTableMap;
+    private final Set<String> _hybridTables;
+    private final Map<String, TablePartitionInfo> _partitionInfoMap;
     private final Map<String, ServerInstance> _serverInstances;
-    private final List<String> _hybridTables;
 
-    public FakeRoutingManager(Map<String, RoutingTable> routingTableMap, 
Map<String, ServerInstance> serverInstances,
-        List<String> hybridTables) {
+    public FakeRoutingManager(Map<String, RoutingTable> routingTableMap, 
Set<String> hybridTables,
+        @Nullable Map<String, TablePartitionInfo> partitionInfoMap, 
Map<String, ServerInstance> serverInstances) {
       _routingTableMap = routingTableMap;
-      _serverInstances = serverInstances;
       _hybridTables = hybridTables;
+      _partitionInfoMap = partitionInfoMap;
+      _serverInstances = serverInstances;
     }
 
     @Override
@@ -163,9 +149,8 @@ public class MockRoutingManagerFactory {
 
     @Override
     public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long 
requestId) {
-      String tableName = 
brokerRequest.getPinotQuery().getDataSource().getTableName();
-      return _routingTableMap.getOrDefault(tableName,
-          
_routingTableMap.get(TableNameBuilder.extractRawTableName(tableName)));
+      String tableNameWithType = 
brokerRequest.getPinotQuery().getDataSource().getTableName();
+      return _routingTableMap.get(tableNameWithType);
     }
 
     @Override
@@ -173,9 +158,10 @@ public class MockRoutingManagerFactory {
       return _routingTableMap.containsKey(tableNameWithType);
     }
 
+    @Nullable
     @Override
-    public TimeBoundaryInfo getTimeBoundaryInfo(String tableName) {
-      String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+    public TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName) {
+      String rawTableName = 
TableNameBuilder.extractRawTableName(offlineTableName);
       return _hybridTables.contains(rawTableName) ? new 
TimeBoundaryInfo(TIME_BOUNDARY_COLUMN,
           String.valueOf(System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(1))) : null;
     }
@@ -183,7 +169,7 @@ public class MockRoutingManagerFactory {
     @Nullable
     @Override
     public TablePartitionInfo getTablePartitionInfo(String tableNameWithType) {
-      return null;
+      return _partitionInfoMap != null ? 
_partitionInfoMap.get(tableNameWithType) : null;
     }
 
     @Override
diff --git 
a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json 
b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
index f5ba9d3182..892b4f060b 100644
--- a/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
+++ b/pinot-query-planner/src/test/resources/queries/PinotHintablePlans.json
@@ -1,33 +1,19 @@
 {
   "pinot_hint_option_tests": {
     "queries": [
-      {
-        "description": "SELECT * inner join with filter on one table",
-        "sql": "EXPLAIN PLAN FOR SELECT /*+ 
joinOptions(is_colocated_by_join_keys='true') */ * FROM a JOIN b ON a.col1 = 
b.col2 WHERE a.col3 >= 0",
-        "output": [
-          "Execution Plan",
-          "\nLogicalJoin(condition=[=($0, $8)], joinType=[inner])",
-          "\n  PinotLogicalExchange(distribution=[single])",
-          "\n    LogicalFilter(condition=[>=($2, 0)])",
-          "\n      LogicalTableScan(table=[[a]])",
-          "\n  PinotLogicalExchange(distribution=[single])",
-          "\n    LogicalTableScan(table=[[b]])",
-          "\n"
-        ]
-      },
       {
         "description": "Inner join with group by",
-        "sql": "EXPLAIN PLAN FOR SELECT /*+ 
joinOptions(is_colocated_by_join_keys='true'), 
aggOptions(is_partitioned_by_group_by_keys='true') */a.col1, AVG(b.col3) FROM a 
JOIN b ON a.col1 = b.col2  WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 
GROUP BY a.col1",
+        "sql": "EXPLAIN PLAN FOR SELECT /*+ 
aggOptions(is_partitioned_by_group_by_keys='true') */ a.col1, AVG(b.col3) FROM 
a JOIN b ON a.col1 = b.col2  WHERE a.col3 >= 0 AND a.col2 = 'a' AND b.col3 < 0 
GROUP BY a.col1",
         "output": [
           "Execution Plan",
           "\nLogicalProject(col1=[$0], EXPR$1=[/(CAST($1):DOUBLE NOT NULL, 
$2)])",
           "\n  LogicalAggregate(group=[{0}], agg#0=[$SUM0($2)], 
agg#1=[COUNT()])",
           "\n    LogicalJoin(condition=[=($0, $1)], joinType=[inner])",
-          "\n      PinotLogicalExchange(distribution=[single])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
           "\n        LogicalProject(col1=[$0])",
           "\n          LogicalFilter(condition=[AND(>=($2, 0), =($1, 'a'))])",
           "\n            LogicalTableScan(table=[[a]])",
-          "\n      PinotLogicalExchange(distribution=[single])",
+          "\n      PinotLogicalExchange(distribution=[hash[0]])",
           "\n        LogicalProject(col2=[$1], col3=[$2])",
           "\n          LogicalFilter(condition=[<($2, 0)])",
           "\n            LogicalTableScan(table=[[b]])",
@@ -36,7 +22,7 @@
       },
       {
         "description": "semi-join with dynamic_broadcast join strategy",
-        "sql": "EXPLAIN PLAN FOR SELECT /*+ 
joinOptions(join_strategy='dynamic_broadcast',is_colocated_by_join_keys='false')
 */ a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 
0)",
+        "sql": "EXPLAIN PLAN FOR SELECT /*+ 
joinOptions(join_strategy='dynamic_broadcast') */ a.col1, a.col2 FROM a WHERE 
a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
         "output": [
           "Execution Plan",
           "\nPinotLogicalExchange(distribution=[hash[0]])",
@@ -50,37 +36,6 @@
           "\n"
         ]
       },
-      {
-        "description": "semi-join with colocated join key",
-        "sql": "EXPLAIN PLAN FOR SELECT /*+ 
joinOptions(is_colocated_by_join_keys) */ * FROM a WHERE a.col1 IN (SELECT col2 
FROM b WHERE b.col3 > 0)",
-        "output": [
-          "Execution Plan",
-          "\nLogicalJoin(condition=[=($0, $7)], joinType=[semi])",
-          "\n  PinotLogicalExchange(distribution=[hash[0]])",
-          "\n    LogicalTableScan(table=[[a]])",
-          "\n  PinotLogicalExchange(distribution=[hash[0]])",
-          "\n    LogicalProject(col2=[$1], col3=[$2])",
-          "\n      LogicalFilter(condition=[>($2, 0)])",
-          "\n        LogicalTableScan(table=[[b]])",
-          "\n"
-        ]
-      },
-      {
-        "description": "semi-join with colocated join key and 
dynamic_broadcast join strategy",
-        "sql": "EXPLAIN PLAN FOR SELECT /*+ 
joinOptions(join_strategy='dynamic_broadcast', 
is_colocated_by_join_keys='true') */ a.col1, a.col2 FROM a WHERE a.col1 IN 
(SELECT col2 FROM b WHERE b.col3 > 0)",
-        "output": [
-          "Execution Plan",
-          "\nPinotLogicalExchange(distribution=[single])",
-          "\n  LogicalJoin(condition=[=($0, $2)], joinType=[semi])",
-          "\n    LogicalProject(col1=[$0], col2=[$1])",
-          "\n      LogicalTableScan(table=[[a]])",
-          "\n    PinotLogicalExchange(distribution=[single], 
relExchangeType=[PIPELINE_BREAKER])",
-          "\n      LogicalProject(col2=[$1], col3=[$2])",
-          "\n        LogicalFilter(condition=[>($2, 0)])",
-          "\n          LogicalTableScan(table=[[b]])",
-          "\n"
-        ]
-      },
       {
         "description": "semi-join with dynamic_broadcast join strategy then 
group-by on same key",
         "sql": "EXPLAIN PLAN FOR SELECT /*+ 
joinOptions(join_strategy='dynamic_broadcast'), 
aggOptionsInternal(agg_type='DIRECT') */ a.col1, SUM(a.col3) FROM a WHERE 
a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 149fc39587..1731f34fe9 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -93,25 +93,27 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
   @BeforeClass
   public void setUp()
       throws Exception {
-    MockInstanceDataManagerFactory factory1 = new 
MockInstanceDataManagerFactory("server1")
-        .registerTable(SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME")
-        .registerTable(SCHEMA_BUILDER.setSchemaName("b").build(), "b_REALTIME")
-        .registerTable(SCHEMA_BUILDER.setSchemaName("c").build(), "c_OFFLINE")
-        .registerTable(SCHEMA_BUILDER.setSchemaName("d").build(), "d")
-        .addSegment("a_REALTIME", buildRows("a_REALTIME"))
-        .addSegment("a_REALTIME", buildRows("a_REALTIME"))
-        .addSegment("b_REALTIME", buildRows("b_REALTIME"))
-        .addSegment("c_OFFLINE", buildRows("c_OFFLINE"))
-        .addSegment("d_OFFLINE", buildRows("d_OFFLINE"));
-    MockInstanceDataManagerFactory factory2 = new 
MockInstanceDataManagerFactory("server2")
-        .registerTable(SCHEMA_BUILDER.setSchemaName("a").build(), "a_REALTIME")
-        .registerTable(SCHEMA_BUILDER.setSchemaName("c").build(), "c_OFFLINE")
-        .registerTable(SCHEMA_BUILDER.setSchemaName("d").build(), "d")
-        .addSegment("a_REALTIME", buildRows("a_REALTIME"))
-        .addSegment("c_OFFLINE", buildRows("c_OFFLINE"))
-        .addSegment("c_OFFLINE", buildRows("c_OFFLINE"))
-        .addSegment("d_OFFLINE", buildRows("d_OFFLINE"))
-        .addSegment("d_REALTIME", buildRows("d_REALTIME"));
+    MockInstanceDataManagerFactory factory1 = new 
MockInstanceDataManagerFactory("server1");
+    factory1.registerTable(SCHEMA_BUILDER.setSchemaName("a").build(), 
"a_REALTIME");
+    factory1.registerTable(SCHEMA_BUILDER.setSchemaName("b").build(), 
"b_REALTIME");
+    factory1.registerTable(SCHEMA_BUILDER.setSchemaName("c").build(), 
"c_OFFLINE");
+    factory1.registerTable(SCHEMA_BUILDER.setSchemaName("d").build(), "d");
+    factory1.addSegment("a_REALTIME", buildRows("a_REALTIME"));
+    factory1.addSegment("a_REALTIME", buildRows("a_REALTIME"));
+    factory1.addSegment("b_REALTIME", buildRows("b_REALTIME"));
+    factory1.addSegment("c_OFFLINE", buildRows("c_OFFLINE"));
+    factory1.addSegment("d_OFFLINE", buildRows("d_OFFLINE"));
+
+    MockInstanceDataManagerFactory factory2 = new 
MockInstanceDataManagerFactory("server2");
+    factory2.registerTable(SCHEMA_BUILDER.setSchemaName("a").build(), 
"a_REALTIME");
+    factory2.registerTable(SCHEMA_BUILDER.setSchemaName("c").build(), 
"c_OFFLINE");
+    factory2.registerTable(SCHEMA_BUILDER.setSchemaName("d").build(), "d");
+    factory2.addSegment("a_REALTIME", buildRows("a_REALTIME"));
+    factory2.addSegment("c_OFFLINE", buildRows("c_OFFLINE"));
+    factory2.addSegment("c_OFFLINE", buildRows("c_OFFLINE"));
+    factory2.addSegment("d_OFFLINE", buildRows("d_OFFLINE"));
+    factory2.addSegment("d_REALTIME", buildRows("d_REALTIME"));
+
     QueryServerEnclosure server1 = new QueryServerEnclosure(factory1);
     QueryServerEnclosure server2 = new QueryServerEnclosure(factory2);
 
@@ -135,9 +137,10 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
     _reducerScheduler.startAsync();
     _mailboxService.start();
 
-    _queryEnvironment = 
QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort, 
server1.getPort(),
-        server2.getPort(), factory1.buildSchemaMap(), 
factory1.buildTableSegmentNameMap(),
-        factory2.buildTableSegmentNameMap());
+    _queryEnvironment =
+        QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort, 
server1.getPort(), server2.getPort(),
+            factory1.getRegisteredSchemaMap(), 
factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap(),
+            null);
     server1.start();
     server2.start();
     // this doesn't test the QueryServer functionality so the server port can 
be the same as the mailbox port.
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index fc78863778..cf691b2f09 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -94,7 +94,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, false);
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
     HashJoinOperator joinOnString =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -132,7 +132,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
     HashJoinOperator joinOnInt =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = joinOnInt.nextBlock();
@@ -167,7 +167,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, false);
+        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
     HashJoinOperator joinOnInt =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = joinOnInt.nextBlock();
@@ -209,7 +209,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.LEFT,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, false);
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -244,7 +244,7 @@ public class HashJoinOperatorTest {
         });
     List<RexExpression> joinClauses = new ArrayList<>();
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -276,7 +276,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.LEFT,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -312,7 +312,7 @@ public class HashJoinOperatorTest {
         });
 
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -351,7 +351,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, false);
+        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
@@ -390,7 +390,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses, false);
+        getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
@@ -425,7 +425,7 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.STRING
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.RIGHT,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
     HashJoinOperator joinOnNum =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = joinOnNum.nextBlock();
@@ -475,7 +475,7 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.STRING
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.SEMI,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, false);
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
@@ -515,7 +515,7 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.STRING
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.FULL,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
@@ -568,7 +568,7 @@ public class HashJoinOperatorTest {
         DataSchema.ColumnDataType.STRING
     });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.ANTI,
-        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses, false);
+        getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
     TransferableBlock result = join.nextBlock();
@@ -607,7 +607,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -641,7 +641,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
@@ -678,7 +678,7 @@ public class HashJoinOperatorTest {
             DataSchema.ColumnDataType.STRING
         });
     JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, 
JoinRelType.INNER,
-        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses, false);
+        getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses);
     HashJoinOperator join =
         new HashJoinOperator(OperatorTestUtil.getDefaultContext(), 
_leftOperator, _rightOperator, leftSchema, node);
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index a9aeb1ec2a..5318a9b580 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -151,7 +151,7 @@ public class PipelineBreakerExecutorTest {
     MailboxReceiveNode mailboxReceiveNode2 =
         new MailboxReceiveNode(0, DATA_SCHEMA, 2, 
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null, false);
+    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null);
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(mailboxReceiveNode2);
     DistributedStagePlan distributedStagePlan =
@@ -246,7 +246,7 @@ public class PipelineBreakerExecutorTest {
     MailboxReceiveNode incorrectlyConfiguredMailboxNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 3, 
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null, false);
+    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null);
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(incorrectlyConfiguredMailboxNode);
     DistributedStagePlan distributedStagePlan =
@@ -284,7 +284,7 @@ public class PipelineBreakerExecutorTest {
     MailboxReceiveNode incorrectlyConfiguredMailboxNode =
         new MailboxReceiveNode(0, DATA_SCHEMA, 2, 
RelDistribution.Type.SINGLETON, PinotRelExchangeType.PIPELINE_BREAKER,
             null, null, false, false, null);
-    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null, false);
+    JoinNode joinNode = new JoinNode(0, DATA_SCHEMA, DATA_SCHEMA, DATA_SCHEMA, 
JoinRelType.INNER, null, null);
     joinNode.addInput(mailboxReceiveNode1);
     joinNode.addInput(incorrectlyConfiguredMailboxNode);
     DistributedStagePlan distributedStagePlan =
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 7db0386c15..dda0981b41 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -37,6 +37,8 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.response.broker.BrokerResponseNativeV2;
 import org.apache.pinot.common.response.broker.BrokerResponseStats;
@@ -71,6 +73,7 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
   private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
   private static final Random RANDOM = new Random(42);
   private static final String FILE_FILTER_PROPERTY = "pinot.fileFilter";
+  private static final int NUM_PARTITIONS = 4;
 
   private final Map<String, Set<String>> _tableToSegmentMap = new HashMap<>();
 
@@ -85,6 +88,7 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
     setH2Connection();
 
     // Scan through all the test cases.
+    Map<String, Pair<String, List<List<String>>>> partitionedSegmentsMap = new 
HashMap<>();
     for (Map.Entry<String, QueryTestCase> testCaseEntry : 
getTestCases().entrySet()) {
       String testCaseName = testCaseEntry.getKey();
       QueryTestCase testCase = testCaseEntry.getValue();
@@ -98,51 +102,54 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
         boolean allowEmptySegment = 
!BooleanUtils.toBoolean(extractExtraProps(testCase._extraProps, 
"noEmptySegment"));
         String tableName = testCaseName + "_" + tableEntry.getKey();
         // Testing only OFFLINE table b/c Hybrid table test is a special case 
to test separately.
-        String tableNameWithType = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
-        org.apache.pinot.spi.data.Schema pinotSchema = 
constructSchema(tableName, tableEntry.getValue()._schema);
+        String offlineTableName = 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
+        Schema pinotSchema = constructSchema(tableName, 
tableEntry.getValue()._schema);
         schemaMap.put(tableName, pinotSchema);
-        factory1.registerTable(pinotSchema, tableNameWithType);
-        factory2.registerTable(pinotSchema, tableNameWithType);
+        factory1.registerTable(pinotSchema, offlineTableName);
+        factory2.registerTable(pinotSchema, offlineTableName);
         List<QueryTestCase.ColumnAndType> columnAndTypes = 
tableEntry.getValue()._schema;
         List<GenericRow> genericRows = toRow(columnAndTypes, 
tableEntry.getValue()._inputs);
 
         // generate segments and dump into server1 and server2
         List<String> partitionColumns = 
tableEntry.getValue()._partitionColumns;
+        String partitionColumn = null;
+        List<List<String>> partitionIdToSegmentsMap = null;
+        if (partitionColumns != null && partitionColumns.size() == 1) {
+          partitionColumn = partitionColumns.get(0);
+          partitionIdToSegmentsMap = new ArrayList<>();
+          for (int i = 0; i < NUM_PARTITIONS; i++) {
+            partitionIdToSegmentsMap.add(new ArrayList<>());
+          }
+        }
 
-        List<GenericRow> rows1 = new ArrayList<>();
-        List<GenericRow> rows2 = new ArrayList<>();
+        List<List<GenericRow>> partitionIdToRowsMap = new ArrayList<>();
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+          partitionIdToRowsMap.add(new ArrayList<>());
+        }
 
         for (GenericRow row : genericRows) {
           if (row == SEGMENT_BREAKER_ROW) {
-            if (allowEmptySegment || rows1.size() > 0) {
-              factory1.addSegment(tableNameWithType, rows1);
-              rows1 = new ArrayList<>();
-            }
-            if (allowEmptySegment || rows2.size() > 0) {
-              factory2.addSegment(tableNameWithType, rows2);
-              rows2 = new ArrayList<>();
-            }
+            addSegments(factory1, factory2, offlineTableName, 
allowEmptySegment, partitionIdToRowsMap,
+                partitionIdToSegmentsMap);
           } else {
-            long partition = 0;
+            int partitionId;
             if (partitionColumns == null) {
-              partition = RANDOM.nextInt(2);
+              partitionId = RANDOM.nextInt(NUM_PARTITIONS);
             } else {
+              int hashCode = 0;
               for (String field : partitionColumns) {
-                partition = (partition + row.getValue(field).hashCode()) % 42;
+                hashCode += row.getValue(field).hashCode();
               }
+              partitionId = (hashCode & Integer.MAX_VALUE) % NUM_PARTITIONS;
             }
-            if (partition % 2 == 0) {
-              rows1.add(row);
-            } else {
-              rows2.add(row);
-            }
+            partitionIdToRowsMap.get(partitionId).add(row);
           }
         }
-        if (allowEmptySegment || rows1.size() > 0) {
-          factory1.addSegment(tableNameWithType, rows1);
-        }
-        if (allowEmptySegment || rows2.size() > 0) {
-          factory2.addSegment(tableNameWithType, rows2);
+        addSegments(factory1, factory2, offlineTableName, allowEmptySegment, 
partitionIdToRowsMap,
+            partitionIdToSegmentsMap);
+
+        if (partitionColumn != null) {
+          partitionedSegmentsMap.put(offlineTableName, 
Pair.of(partitionColumn, partitionIdToSegmentsMap));
         }
       }
 
@@ -197,7 +204,8 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
 
     _queryEnvironment =
         QueryEnvironmentTestBase.getQueryEnvironment(_reducerGrpcPort, 
server1.getPort(), server2.getPort(),
-            factory1.buildSchemaMap(), factory1.buildTableSegmentNameMap(), 
factory2.buildTableSegmentNameMap());
+            factory1.getRegisteredSchemaMap(), 
factory1.buildTableSegmentNameMap(), factory2.buildTableSegmentNameMap(),
+            partitionedSegmentsMap);
     server1.start();
     server2.start();
     // this doesn't test the QueryServer functionality so the server port can 
be the same as the mailbox port.
@@ -208,6 +216,22 @@ public class ResourceBasedQueriesTest extends 
QueryRunnerTestBase {
     _servers.put(new QueryServerInstance("localhost", port2, port2), server2);
   }
 
+  private void addSegments(MockInstanceDataManagerFactory factory1, 
MockInstanceDataManagerFactory factory2,
+      String offlineTableName, boolean allowEmptySegment, 
List<List<GenericRow>> partitionIdToRowsMap,
+      @Nullable List<List<String>> partitionIdToSegmentsMap) {
+    for (int i = 0; i < NUM_PARTITIONS; i++) {
+      MockInstanceDataManagerFactory factory = i < (NUM_PARTITIONS / 2) ? 
factory1 : factory2;
+      List<GenericRow> rows = partitionIdToRowsMap.get(i);
+      if (allowEmptySegment || !rows.isEmpty()) {
+        String segmentName = factory.addSegment(offlineTableName, rows);
+        if (partitionIdToSegmentsMap != null) {
+          partitionIdToSegmentsMap.get(i).add(segmentName);
+        }
+        rows.clear();
+      }
+    }
+  }
+
   @AfterClass
   public void tearDown() {
     
DataTableBuilderFactory.setDataTableVersion(DataTableBuilderFactory.DEFAULT_VERSION);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 62311432ee..397ed19fd2 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -85,7 +85,7 @@ public class QueryDispatcherTest extends QueryTestSet {
     // reducer port doesn't matter, we are testing the worker instance not 
GRPC.
     _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, 
portList.get(0), portList.get(1),
         QueryEnvironmentTestBase.TABLE_SCHEMAS, 
QueryEnvironmentTestBase.SERVER1_SEGMENTS,
-        QueryEnvironmentTestBase.SERVER2_SEGMENTS);
+        QueryEnvironmentTestBase.SERVER2_SEGMENTS, null);
   }
 
   @AfterClass
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index affc23f826..7c432046b7 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -95,7 +95,7 @@ public class QueryServerTest extends QueryTestSet {
     // reducer port doesn't matter, we are testing the worker instance not 
GRPC.
     _queryEnvironment = QueryEnvironmentTestBase.getQueryEnvironment(1, 
portList.get(0), portList.get(1),
         QueryEnvironmentTestBase.TABLE_SCHEMAS, 
QueryEnvironmentTestBase.SERVER1_SEGMENTS,
-        QueryEnvironmentTestBase.SERVER2_SEGMENTS);
+        QueryEnvironmentTestBase.SERVER2_SEGMENTS, null);
   }
 
   @AfterClass
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
index 1b27d24c53..fd3ef7a374 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -34,6 +34,7 @@ import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoa
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -44,7 +45,7 @@ import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
-import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -52,11 +53,18 @@ import static org.mockito.Mockito.when;
 public class MockInstanceDataManagerFactory {
   private static final String DATA_DIR_PREFIX = "MockInstanceDataDir";
 
-  private final Map<String, List<GenericRow>> _tableRowsMap;
+  // Key is table name with type
   private final Map<String, List<ImmutableSegment>> _tableSegmentMap;
   private final Map<String, List<String>> _tableSegmentNameMap;
   private final Map<String, File> _serverTableDataDirMap;
+
+  // Key is raw table name
+  private final Map<String, List<GenericRow>> _tableRowsMap;
   private final Map<String, Schema> _schemaMap;
+
+  // Key is registered table (with or without type)
+  private final Map<String, Schema> _registeredSchemaMap;
+
   private String _serverName;
 
   public MockInstanceDataManagerFactory(String serverName) {
@@ -66,20 +74,29 @@ public class MockInstanceDataManagerFactory {
     _tableSegmentNameMap = new HashMap<>();
     _tableRowsMap = new HashMap<>();
     _schemaMap = new HashMap<>();
+    _registeredSchemaMap = new HashMap<>();
   }
 
-  public MockInstanceDataManagerFactory registerTable(Schema schema, String 
tableName) {
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-    if (tableType == null) {
-      registerTableNameWithType(schema, 
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName));
-      registerTableNameWithType(schema, 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName));
-    } else {
+  public void registerTable(Schema schema, String tableName) {
+    _registeredSchemaMap.put(tableName, schema);
+    if (TableNameBuilder.isTableResource(tableName)) {
+      _schemaMap.put(TableNameBuilder.extractRawTableName(tableName), schema);
       registerTableNameWithType(schema, tableName);
+    } else {
+      _schemaMap.put(tableName, schema);
+      registerTableNameWithType(schema, 
TableNameBuilder.OFFLINE.tableNameWithType(tableName));
+      registerTableNameWithType(schema, 
TableNameBuilder.REALTIME.tableNameWithType(tableName));
     }
-    return this;
   }
 
-  public MockInstanceDataManagerFactory addSegment(String tableNameWithType, 
List<GenericRow> rows) {
+  private void registerTableNameWithType(Schema schema, String 
tableNameWithType) {
+    File tableDataDir = new File(FileUtils.getTempDirectory(),
+        String.format("%s_%s_%s", DATA_DIR_PREFIX, _serverName, 
tableNameWithType));
+    FileUtils.deleteQuietly(tableDataDir);
+    _serverTableDataDirMap.put(tableNameWithType, tableDataDir);
+  }
+
+  public String addSegment(String tableNameWithType, List<GenericRow> rows) {
     String segmentName = String.format("%s_%s", tableNameWithType, 
UUID.randomUUID());
     File tableDataDir = _serverTableDataDirMap.get(tableNameWithType);
     ImmutableSegment segment = buildSegment(tableNameWithType, tableDataDir, 
segmentName, rows);
@@ -96,7 +113,8 @@ public class MockInstanceDataManagerFactory {
     List<GenericRow> tableRows = _tableRowsMap.getOrDefault(rawTableName, new 
ArrayList<>());
     tableRows.addAll(rows);
     _tableRowsMap.put(rawTableName, tableRows);
-    return this;
+
+    return segmentName;
   }
 
   public InstanceDataManager buildInstanceDataManager() {
@@ -107,11 +125,15 @@ public class MockInstanceDataManagerFactory {
       tableDataManagers.put(e.getKey(), tableDataManager);
     }
     for (Map.Entry<String, TableDataManager> e : tableDataManagers.entrySet()) 
{
-      when(instanceDataManager.getTableDataManager(e.getKey())).thenAnswer(inv 
-> e.getValue());
+      
when(instanceDataManager.getTableDataManager(e.getKey())).thenReturn(e.getValue());
     }
     return instanceDataManager;
   }
 
+  public Map<String, Schema> getRegisteredSchemaMap() {
+    return _registeredSchemaMap;
+  }
+
   public Map<String, Schema> buildSchemaMap() {
     return _schemaMap;
   }
@@ -125,10 +147,13 @@ public class MockInstanceDataManagerFactory {
   }
 
   private TableDataManager mockTableDataManager(List<ImmutableSegment> 
segmentList) {
-    List<SegmentDataManager> tableSegmentDataManagers =
-        
segmentList.stream().map(ImmutableSegmentDataManager::new).collect(Collectors.toList());
+    Map<String, SegmentDataManager> segmentDataManagerMap =
+        
segmentList.stream().collect(Collectors.toMap(IndexSegment::getSegmentName, 
ImmutableSegmentDataManager::new));
     TableDataManager tableDataManager = mock(TableDataManager.class);
-    when(tableDataManager.acquireSegments(any(), 
any())).thenReturn(tableSegmentDataManagers);
+    when(tableDataManager.acquireSegments(anyList(), 
anyList())).thenAnswer(invocation -> {
+      List<String> segments = invocation.getArgument(0);
+      return 
segments.stream().map(segmentDataManagerMap::get).collect(Collectors.toList());
+    });
     return tableDataManager;
   }
 
@@ -137,9 +162,9 @@ public class MockInstanceDataManagerFactory {
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     // TODO: plugin table config constructor
-    TableConfig tableConfig = new 
TableConfigBuilder(tableType).setTableName(rawTableName).setTimeColumnName("ts")
-        .build();
-    Schema schema = _schemaMap.get(tableNameWithType);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(tableType).setTableName(rawTableName).setTimeColumnName("ts").build();
+    Schema schema = _schemaMap.get(rawTableName);
     SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
     config.setOutDir(indexDir.getPath());
     config.setTableName(tableNameWithType);
@@ -154,13 +179,4 @@ public class MockInstanceDataManagerFactory {
       throw new RuntimeException("Unable to construct immutable segment from 
records", e);
     }
   }
-
-  private void registerTableNameWithType(Schema schema, String 
tableNameWithType) {
-    File tableDataDir = new File(FileUtils.getTempDirectory(),
-        String.format("%s_%s_%s", DATA_DIR_PREFIX, _serverName, 
tableNameWithType));
-    FileUtils.deleteQuietly(tableDataDir);
-    _serverTableDataDirMap.put(tableNameWithType, tableDataDir);
-    _schemaMap.put(TableNameBuilder.extractRawTableName(tableNameWithType), 
schema);
-    _schemaMap.put(tableNameWithType, schema);
-  }
 }
diff --git a/pinot-query-runtime/src/test/resources/queries/Aggregates.json 
b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
index 9f14015514..4719c77202 100644
--- a/pinot-query-runtime/src/test/resources/queries/Aggregates.json
+++ b/pinot-query-runtime/src/test/resources/queries/Aggregates.json
@@ -654,13 +654,13 @@
         "description": "nested aggregation",
         "sql": "SELECT min(max(int_col)) FROM {tbl}",
         "comments": ".*Aggregate expressions cannot be nested.",
-        "expectedException": ".*Error composing query plan for.*"
+        "expectedException": "Error composing query plan for.*"
       },
       {
         "psql": "4.2.7",
         "description": "nested aggregation",
         "sql": "SELECT (SELECT max(min(int_col)) FROM {tbl}) from {tbl};",
-        "expectedException": ".*Error composing query plan for.*"
+        "expectedException": "Error composing query plan for.*"
       },
       {
         "psql": "4.2.7",
diff --git a/pinot-query-runtime/src/test/resources/queries/CountDistinct.json 
b/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
index 9c5cea93f3..973d83b70c 100644
--- a/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
+++ b/pinot-query-runtime/src/test/resources/queries/CountDistinct.json
@@ -117,14 +117,14 @@
         "outputs": [["b", 6], ["a", 6]]
       },
       {
-        "comments": "table aren't actually partitioned by val thus all 
segments can produce duplicate results, thus [[6]]",
+        "comments": "table aren't actually partitioned by val thus all 
segments can produce duplicate results, thus [[8]]",
         "sql": "SELECT SEGMENT_PARTITIONED_DISTINCT_COUNT(val) FROM {tbl1}",
-        "outputs": [[6]]
+        "outputs": [[8]]
       },
       {
-        "comments": "table aren't actually partitioned by val thus all 
segments can produce duplicate results, thus [[b, 4], [a, 4]]",
+        "comments": "table aren't actually partitioned by val thus all 
segments can produce duplicate results, thus [[b, 5], [a, 4]]",
         "sql": "SELECT groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(val) 
FROM {tbl1} GROUP BY groupingCol",
-        "outputs": [["b", 4], ["a", 4]]
+        "outputs": [["b", 5], ["a", 4]]
       },
       {
         "sql": "SELECT l.groupingCol, 
SEGMENT_PARTITIONED_DISTINCT_COUNT(l.val), 
SEGMENT_PARTITIONED_DISTINCT_COUNT(r.val) FROM {tbl1} l JOIN {tbl2} r ON 
l.groupingCol = r.groupingCol GROUP BY l.groupingCol",
@@ -135,9 +135,9 @@
         "outputs": [["b", 6], ["a", 6]]
       },
       {
-        "comments": "table aren't actually partitioned by val thus all 
segments can produce duplicate results, thus [[b, 4], [a, 4]]",
+        "comments": "table aren't actually partitioned by val thus all 
segments can produce duplicate results, thus [[b, 5], [a, 4]]",
         "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ 
groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(val) FROM {tbl1} GROUP BY 
groupingCol",
-        "outputs": [["b", 4], ["a", 4]]
+        "outputs": [["b", 5], ["a", 4]]
       },
       {
         "sql": "SELECT /*+ aggOptions(is_skip_leaf_stage_aggregate='true') */ 
l.groupingCol, SEGMENT_PARTITIONED_DISTINCT_COUNT(l.val), 
SEGMENT_PARTITIONED_DISTINCT_COUNT(r.val) FROM {tbl1} l JOIN {tbl2} r ON 
l.groupingCol = r.groupingCol GROUP BY l.groupingCol",
diff --git a/pinot-query-runtime/src/test/resources/queries/QueryHints.json 
b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
index e9721994d9..940b318f1a 100644
--- a/pinot-query-runtime/src/test/resources/queries/QueryHints.json
+++ b/pinot-query-runtime/src/test/resources/queries/QueryHints.json
@@ -51,37 +51,55 @@
       }
     },
     "queries": [
+      {
+        "description": "Wrong partition key",
+        "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ 
tableOptions(partition_key='name', partition_size='4') */ GROUP BY {tbl1}.num",
+        "expectedException": "Error composing query plan for.*"
+      },
+      {
+        "description": "Wrong partition size",
+        "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ 
tableOptions(partition_key='num', partition_size='2') */ GROUP BY {tbl1}.num",
+        "expectedException": "Error composing query plan for.*"
+      },
+      {
+        "description": "Group by partition column",
+        "sql": "SELECT {tbl1}.num, COUNT(*) FROM {tbl1} /*+ 
tableOptions(partition_key='num', partition_size='4') */ GROUP BY {tbl1}.num"
+      },
       {
         "description": "Colocated JOIN with partition column",
-        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true') */ 
{tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM {tbl1} JOIN {tbl2} ON 
{tbl1}.num = {tbl2}.num"
+        "sql": "SELECT {tbl1}.num, {tbl1}.name, {tbl2}.num, {tbl2}.val FROM 
{tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} 
/*+ tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = 
{tbl2}.num"
       },
       {
         "description": "Colocated JOIN with partition column and group by 
partition column",
-        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), 
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, {tbl1}.name, 
SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY 
{tbl1}.num, {tbl1}.name"
+        "sql": "SELECT /*+ aggOptions(is_partitioned_by_group_by_keys='true') 
*/ {tbl1}.num, {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ 
tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = 
{tbl2}.num GROUP BY {tbl1}.num, {tbl1}.name"
       },
       {
         "description": "Colocated JOIN with partition column and group by 
non-partitioned column",
-        "sql": "SELECT /*+ joinOptions(is_colocated_by_join_keys='true'), 
aggOptions(is_partitioned_by_group_by_keys='false') */ {tbl1}.name, 
SUM({tbl2}.num) FROM {tbl1} JOIN {tbl2} ON {tbl1}.num = {tbl2}.num GROUP BY 
{tbl1}.name"
+        "sql": "SELECT {tbl1}.name, SUM({tbl2}.num) FROM {tbl1} /*+ 
tableOptions(partition_key='num', partition_size='4') */ JOIN {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ ON {tbl1}.num = 
{tbl2}.num GROUP BY {tbl1}.name"
+      },
+      {
+        "description": "Colocated JOIN with partition column and group by 
non-partitioned column with stage parallelism",
+        "sql": "SET stageParallelism=2; SELECT {tbl1}.name, SUM({tbl2}.num) 
FROM {tbl1} /*+ tableOptions(partition_key='num', partition_size='4') */ JOIN 
{tbl2} /*+ tableOptions(partition_key='num', partition_size='4') */ ON 
{tbl1}.num = {tbl2}.num GROUP BY {tbl1}.name"
       },
       {
         "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition 
column",
-        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', 
is_colocated_by_join_keys='true') */ {tbl1}.num, {tbl1}.name FROM {tbl1} WHERE 
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 
'yyy'))"
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.num, {tbl1}.name FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN 
('xxx', 'yyy'))"
       },
       {
         "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition 
column and group by partition column",
-        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', 
is_colocated_by_join_keys='true'), 
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, 
COUNT({tbl1}.name) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM 
{tbl2} WHERE {tbl2}.val IN ('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), 
aggOptions(is_partitioned_by_group_by_keys='true') */ {tbl1}.num, 
COUNT({tbl1}.name) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN 
('xxx', 'yyy')) GROUP BY {tbl1}.num, {tbl1}.name"
       },
       {
         "description": "Colocated, Dynamic broadcast SEMI-JOIN with partition 
column and group by non-partitioned column",
-        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', 
is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE 
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val IN ('xxx', 
'yyy')) GROUP BY {tbl1}.name"
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val IN 
('xxx', 'yyy')) GROUP BY {tbl1}.name"
       },
       {
         "description": "Dynamic broadcast SEMI-JOIN with empty right table 
result",
-        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.name, COUNT(*) FROM {tbl1} WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM 
{tbl2} WHERE {tbl2}.val = 'non-exist') GROUP BY {tbl1}.name"
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val = 
'non-exist') GROUP BY {tbl1}.name"
       },
       {
         "description": "Colocated, Dynamic broadcast SEMI-JOIN with partially 
empty right table result for some servers",
-        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast', 
is_colocated_by_join_keys='true') */ {tbl1}.name, COUNT(*) FROM {tbl1} WHERE 
{tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} WHERE {tbl2}.val = 'z') GROUP BY 
{tbl1}.name"
+        "sql": "SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ 
{tbl1}.name, COUNT(*) FROM {tbl1} /*+ tableOptions(partition_key='num', 
partition_size='4') */ WHERE {tbl1}.num IN (SELECT {tbl2}.num FROM {tbl2} /*+ 
tableOptions(partition_key='num', partition_size='4') */ WHERE {tbl2}.val = 
'z') GROUP BY {tbl1}.name"
       },
       {
         "description": "Skip leaf stage aggregation with GROUP BY hint",
diff --git a/pinot-query-runtime/src/test/resources/queries/SelectHaving.json 
b/pinot-query-runtime/src/test/resources/queries/SelectHaving.json
index 9aad9542ca..1041bc7d4f 100644
--- a/pinot-query-runtime/src/test/resources/queries/SelectHaving.json
+++ b/pinot-query-runtime/src/test/resources/queries/SelectHaving.json
@@ -44,12 +44,12 @@
       {
         "comment": "Plan failed. Expression 'a' is not being grouped.",
         "sql":"SELECT a FROM {test_having} HAVING min(a) < max(a);",
-        "expectedException": ".*Error composing query plan.*"
+        "expectedException": "Error composing query plan for.*"
       },
       {
         "comment": "Plan failed. Expression 'a' is not being grouped.",
         "sql":"SELECT 1 AS one FROM {test_having} HAVING a > 1;",
-        "expectedException": ".*Error composing query plan.*"
+        "expectedException": "Error composing query plan for.*"
       },
       {
         "sql":"SELECT 1 AS one FROM {test_having} HAVING 1 > 2;"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to