This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ff0a3530fc [multistage] hybrid routing support (#9379)
ff0a3530fc is described below

commit ff0a3530fc0b4b92191c7598829e220994c0e68f
Author: Rong Rong <[email protected]>
AuthorDate: Sun Sep 11 15:28:41 2022 -0700

    [multistage] hybrid routing support (#9379)
    
    Preliminary support for hybrid routing on the multistage engine.
    
    It copies some of the logic in the broker request side but not all. and 
only has basic testing.
    - copied some time boundary logic (manager and associated logic) into 
either the core module or replicated in the multi-stage planner
    - created a hybrid routing dispatchable StageMetadata
      - indexed each segment with table types
      - added time boundary info if dispatching multiple table types, otherwise 
ignore
      - reconstructed server requests multiple times.
    
    
    Co-authored-by: Rong Rong <[email protected]>
---
 .../broker/api/resources/PinotBrokerDebug.java     |  2 +-
 .../requesthandler/BaseBrokerRequestHandler.java   |  2 +-
 .../pinot/broker/routing/BrokerRoutingManager.java |  3 +-
 .../routing/timeboundary/TimeBoundaryManager.java  |  1 +
 .../broker/broker/HelixBrokerStarterTest.java      |  2 +-
 .../timeboundary/TimeBoundaryManagerTest.java      |  1 +
 pinot-common/src/main/proto/worker.proto           |  8 ++-
 .../apache/pinot/core/routing/RoutingManager.java  |  8 +++
 .../pinot/core/routing}/TimeBoundaryInfo.java      |  2 +-
 .../apache/pinot/query/planner/StageMetadata.java  | 21 +++++-
 .../apache/pinot/query/routing/WorkerManager.java  | 69 ++++++++++++++----
 .../apache/pinot/query/QueryCompilationTest.java   | 11 ++-
 .../pinot/query/QueryEnvironmentTestBase.java      |  1 +
 .../pinot/query/QueryEnvironmentTestUtils.java     | 17 +++--
 .../apache/pinot/query/runtime/QueryRunner.java    | 84 ++++++++++++++--------
 .../runtime/plan/serde/QueryPlanSerDeUtils.java    | 37 ++++++++--
 .../query/runtime/utils/ServerRequestUtils.java    | 82 +++++++++++++++++----
 .../apache/pinot/query/QueryServerEnclosure.java   |  4 +-
 .../query/runtime/QueryRunnerExceptionTest.java    |  4 +-
 .../pinot/query/runtime/QueryRunnerTest.java       |  3 +
 .../pinot/query/runtime/QueryRunnerTestBase.java   | 13 ++--
 21 files changed, 287 insertions(+), 88 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index 67d69a1ba2..6252ae5630 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -41,8 +41,8 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
-import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
 import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 07d9b321f7..27f7f32202 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -50,7 +50,6 @@ import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
-import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.http.MultiHttpRequest;
@@ -74,6 +73,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.optimizer.QueryOptimizer;
 import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.core.util.QueryOptionsUtils;
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 2f5cc95173..04ddf7f37a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -47,7 +47,6 @@ import 
org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
 import org.apache.pinot.broker.routing.segmentpruner.SegmentPrunerFactory;
 import org.apache.pinot.broker.routing.segmentselector.SegmentSelector;
 import org.apache.pinot.broker.routing.segmentselector.SegmentSelectorFactory;
-import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
 import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryManager;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -56,6 +55,7 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.spi.config.table.QueryConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -596,6 +596,7 @@ public class BrokerRoutingManager implements 
RoutingManager, ClusterChangeHandle
    * <p>NOTE: Time boundary info is only available for the offline part of the 
hybrid table.
    */
   @Nullable
+  @Override
   public TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName) {
     RoutingEntry routingEntry = _routingEntryMap.get(offlineTableName);
     if (routingEntry == null) {
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index 17100de6df..3aeed3e35a 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -33,6 +33,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index ddab2f7bf9..0fc2ddcd2e 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -25,7 +25,6 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
-import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.config.TagNameUtils;
@@ -33,6 +32,7 @@ import 
org.apache.pinot.controller.api.exception.InvalidTableConfigException;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
index fb0d664874..34271815f4 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
@@ -33,6 +33,7 @@ import org.apache.helix.zookeeper.impl.client.ZkClient;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
diff --git a/pinot-common/src/main/proto/worker.proto 
b/pinot-common/src/main/proto/worker.proto
index 87aecc8391..8f780bd260 100644
--- a/pinot-common/src/main/proto/worker.proto
+++ b/pinot-common/src/main/proto/worker.proto
@@ -68,7 +68,13 @@ message StagePlan {
 message StageMetadata {
   repeated string instances = 1;
   repeated string dataSources = 2;
-  map<string, SegmentList> instanceToSegmentList = 3;
+  map<string, SegmentMap> instanceToSegmentMap = 3;
+  string timeColumn = 4;
+  string timeValue = 5;
+}
+
+message SegmentMap {
+  map<string, SegmentList> tableTypeToSegmentList = 1;
 }
 
 message SegmentList {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
index fbb3f5bf00..db535dcaa6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingManager.java
@@ -59,4 +59,12 @@ public interface RoutingManager {
    * @return true if the route table exists.
    */
   boolean routingExists(String tableNameWithType);
+
+  /**
+   * Acquire the time boundary info. Useful for hybrid logical table queries 
that needs to split between
+   * realtime and offline.
+   * @param offlineTableName offline table name
+   * @return time boundary info.
+   */
+  TimeBoundaryInfo getTimeBoundaryInfo(String offlineTableName);
 }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryInfo.java
 b/pinot-core/src/main/java/org/apache/pinot/core/routing/TimeBoundaryInfo.java
similarity index 95%
rename from 
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryInfo.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/routing/TimeBoundaryInfo.java
index 5efbafadea..f6d804f5f0 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryInfo.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/routing/TimeBoundaryInfo.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.routing.timeboundary;
+package org.apache.pinot.core.routing;
 
 public class TimeBoundaryInfo {
   private final String _timeColumn;
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
index 8e691003c9..fe2531f9b6 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.TableScanNode;
@@ -45,12 +46,17 @@ public class StageMetadata implements Serializable {
   private List<ServerInstance> _serverInstances;
 
   // used for table scan stage.
-  private Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+  private Map<ServerInstance, Map<String, List<String>>> 
_serverInstanceToSegmentsMap;
+
+  // time boundary info
+  private TimeBoundaryInfo _timeBoundaryInfo;
+
 
   public StageMetadata() {
     _scannedTables = new ArrayList<>();
     _serverInstances = new ArrayList<>();
     _serverInstanceToSegmentsMap = new HashMap<>();
+    _timeBoundaryInfo = null;
   }
 
   public void attach(StageNode stageNode) {
@@ -67,11 +73,12 @@ public class StageMetadata implements Serializable {
   // attached physical plan context.
   // -----------------------------------------------
 
-  public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap() {
+  public Map<ServerInstance, Map<String, List<String>>> 
getServerInstanceToSegmentsMap() {
     return _serverInstanceToSegmentsMap;
   }
 
-  public void setServerInstanceToSegmentsMap(Map<ServerInstance, List<String>> 
serverInstanceToSegmentsMap) {
+  public void setServerInstanceToSegmentsMap(
+      Map<ServerInstance, Map<String, List<String>>> 
serverInstanceToSegmentsMap) {
     _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
   }
 
@@ -82,4 +89,12 @@ public class StageMetadata implements Serializable {
   public void setServerInstances(List<ServerInstance> serverInstances) {
     _serverInstances = serverInstances;
   }
+
+  public TimeBoundaryInfo getTimeBoundaryInfo() {
+    return _timeBoundaryInfo;
+  }
+
+  public void setTimeBoundaryInfo(TimeBoundaryInfo timeBoundaryInfo) {
+    _timeBoundaryInfo = timeBoundaryInfo;
+  }
 }
diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 03f4ddcd9c..9238c33ca4 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.routing;
 
+import com.clearspring.analytics.util.Preconditions;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -26,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.PlannerUtils;
 import org.apache.pinot.query.planner.StageMetadata;
@@ -58,11 +60,38 @@ public class WorkerManager {
 
   public void assignWorkerToStage(int stageId, StageMetadata stageMetadata) {
     List<String> scannedTables = stageMetadata.getScannedTables();
-    if (scannedTables.size() == 1) { // table scan stage, need to attach 
server as well as segment info.
-      RoutingTable routingTable = getRoutingTable(scannedTables.get(0));
-      Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = 
routingTable.getServerInstanceToSegmentsMap();
+    if (scannedTables.size() == 1) {
+      // table scan stage, need to attach server as well as segment info for 
each physical table type.
+      String logicalTableName = scannedTables.get(0);
+      Map<String, RoutingTable> routingTableMap = 
getRoutingTable(logicalTableName);
+      // acquire time boundary info if it is a hybrid table.
+      if (routingTableMap.size() > 1) {
+        TimeBoundaryInfo timeBoundaryInfo = 
_routingManager.getTimeBoundaryInfo(TableNameBuilder
+            
.forType(TableType.OFFLINE).tableNameWithType(TableNameBuilder.extractRawTableName(logicalTableName)));
+        if (timeBoundaryInfo != null) {
+          stageMetadata.setTimeBoundaryInfo(timeBoundaryInfo);
+        } else {
+          // remove offline table routing if no time boundary info is acquired.
+          routingTableMap.remove(TableType.OFFLINE.name());
+        }
+      }
+
+      // extract all the instances associated to each table type
+      Map<ServerInstance, Map<String, List<String>>> 
serverInstanceToSegmentsMap = new HashMap<>();
+      for (Map.Entry<String, RoutingTable> routingEntry : 
routingTableMap.entrySet()) {
+        String tableType = routingEntry.getKey();
+        RoutingTable routingTable = routingEntry.getValue();
+        // for each server instance, attach all table types and their 
associated segment list.
+        for (Map.Entry<ServerInstance, List<String>> serverEntry
+            : routingTable.getServerInstanceToSegmentsMap().entrySet()) {
+          serverInstanceToSegmentsMap.putIfAbsent(serverEntry.getKey(), new 
HashMap<>());
+          Map<String, List<String>> tableTypeToSegmentListMap = 
serverInstanceToSegmentsMap.get(serverEntry.getKey());
+          Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, 
serverEntry.getValue()) == null,
+              "Entry for server {} and table type: {} already exist!", 
serverEntry.getKey(), tableType);
+        }
+      }
       stageMetadata.setServerInstances(new 
ArrayList<>(serverInstanceToSegmentsMap.keySet()));
-      stageMetadata.setServerInstanceToSegmentsMap(new 
HashMap<>(serverInstanceToSegmentsMap));
+      
stageMetadata.setServerInstanceToSegmentsMap(serverInstanceToSegmentsMap);
     } else if (PlannerUtils.isRootStage(stageId)) {
       // ROOT stage doesn't have a QueryServer as it is strictly only reducing 
results.
       // here we simply assign the worker instance with identical 
server/mailbox port number.
@@ -86,13 +115,29 @@ public class WorkerManager {
     return serverInstances;
   }
 
-  private RoutingTable getRoutingTable(String tableName) {
-    String rawTableName = TableNameBuilder.extractRawTableName(tableName);
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableName);
-    // TODO: support both offline and realtime, now default only query the 
OFFLINE table.
-    tableType = tableType == null ? TableType.OFFLINE : tableType;
-    String tableNameWithType = 
TableNameBuilder.forType(tableType).tableNameWithType(rawTableName);
-    return 
_routingManager.getRoutingTable(CalciteSqlCompiler.compileToBrokerRequest(
-        "SELECT * FROM " + tableNameWithType));
+  /**
+   * Acquire routing table for items listed in {@link 
org.apache.pinot.query.planner.stage.TableScanNode}.
+   *
+   * @param logicalTableName it can either be a hybrid table name or a 
physical table name with table type.
+   * @return keyed-map from table type(s) to routing table(s).
+   */
+  private Map<String, RoutingTable> getRoutingTable(String logicalTableName) {
+    String rawTableName = 
TableNameBuilder.extractRawTableName(logicalTableName);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(logicalTableName);
+    Map<String, RoutingTable> routingTableMap = new HashMap<>();
+    if (tableType == null) {
+      routingTableMap.put(TableType.OFFLINE.name(), 
getRoutingTable(rawTableName, TableType.OFFLINE));
+      routingTableMap.put(TableType.REALTIME.name(), 
getRoutingTable(rawTableName, TableType.REALTIME));
+    } else {
+      routingTableMap.put(tableType.name(), getRoutingTable(logicalTableName, 
tableType));
+    }
+    return routingTableMap;
+  }
+
+  private RoutingTable getRoutingTable(String tableName, TableType tableType) {
+    String tableNameWithType = 
TableNameBuilder.forType(tableType).tableNameWithType(
+        TableNameBuilder.extractRawTableName(tableName));
+    return _routingManager.getRoutingTable(
+        CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + 
tableNameWithType));
   }
 }
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
index dcd7c5c11a..94c62c856e 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java
@@ -122,7 +122,7 @@ public class QueryCompilationTest extends 
QueryEnvironmentTestBase {
         // table scan stages; for tableA it should have 2 hosts, for tableB it 
should have only 1
         Assert.assertEquals(
             
e.getValue().getServerInstances().stream().map(ServerInstance::toString).collect(Collectors.toList()),
-            tables.get(0).equals("a") ? ImmutableList.of("Server_localhost_1", 
"Server_localhost_2")
+            tables.get(0).equals("a") ? ImmutableList.of("Server_localhost_2", 
"Server_localhost_1")
                 : ImmutableList.of("Server_localhost_1"));
       } else if (!PlannerUtils.isRootStage(e.getKey())) {
         // join stage should have both servers used.
@@ -159,8 +159,8 @@ public class QueryCompilationTest extends 
QueryEnvironmentTestBase {
     List<StageMetadata> tableScanMetadataList = 
queryPlan.getStageMetadataMap().values().stream()
         .filter(stageMetadata -> stageMetadata.getScannedTables().size() != 
0).collect(Collectors.toList());
     Assert.assertEquals(tableScanMetadataList.size(), 1);
-    
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(), 
1);
-    
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
 "Server_localhost_1");
+    
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(), 
2);
+
     query = "SELECT * FROM d_REALTIME";
     queryPlan = _queryEnvironment.planQuery(query);
     tableScanMetadataList = queryPlan.getStageMetadataMap().values().stream()
@@ -169,15 +169,12 @@ public class QueryCompilationTest extends 
QueryEnvironmentTestBase {
     
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(), 
1);
     
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
 "Server_localhost_2");
 
-    // Default routing to OFFLINE table.
-    // TODO: change this test to assert actual time-boundary routing once we 
support this.
     query = "SELECT * FROM d";
     queryPlan = _queryEnvironment.planQuery(query);
     tableScanMetadataList = queryPlan.getStageMetadataMap().values().stream()
         .filter(stageMetadata -> stageMetadata.getScannedTables().size() != 
0).collect(Collectors.toList());
     Assert.assertEquals(tableScanMetadataList.size(), 1);
-    
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(), 
1);
-    
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().get(0).toString(),
 "Server_localhost_1");
+    
Assert.assertEquals(tableScanMetadataList.get(0).getServerInstances().size(), 
2);
   }
 
   // Test that plan query can be run as multi-thread.
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
index f83eea0fb3..65ea646e9c 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestBase.java
@@ -45,6 +45,7 @@ public class QueryEnvironmentTestBase {
     return new Object[][] {
         new Object[]{"SELECT * FROM a ORDER BY col1 LIMIT 10"},
         new Object[]{"SELECT * FROM b ORDER BY col1, col2 DESC LIMIT 10"},
+        new Object[]{"SELECT * FROM d"},
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2"},
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 
>= 0"},
         new Object[]{"SELECT * FROM a JOIN b ON a.col1 = b.col2 WHERE a.col3 
>= 0 AND a.col3 > b.col3"},
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
index e882d09db7..e8c29aeb02 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryEnvironmentTestUtils.java
@@ -24,12 +24,14 @@ import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.calcite.jdbc.CalciteSchemaBuilder;
 import org.apache.pinot.common.config.provider.TableCache;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.catalog.PinotCatalog;
 import org.apache.pinot.query.planner.QueryPlan;
@@ -42,6 +44,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -54,10 +57,10 @@ public class QueryEnvironmentTestUtils {
   public static final Schema.SchemaBuilder SCHEMA_BUILDER;
   public static final Map<String, List<String>> SERVER1_SEGMENTS =
       ImmutableMap.of("a", Lists.newArrayList("a1", "a2"), "b", 
Lists.newArrayList("b1"), "c",
-          Lists.newArrayList("c1"), "d_O", Lists.newArrayList("d1", "d2"));
+          Lists.newArrayList("c1"), "d_O", Lists.newArrayList("d1"));
   public static final Map<String, List<String>> SERVER2_SEGMENTS =
       ImmutableMap.of("a", Lists.newArrayList("a3"), "c", 
Lists.newArrayList("c2", "c3"),
-          "d_R", Lists.newArrayList("d3", "d4"));
+          "d_R", Lists.newArrayList("d2"), "d_O", Lists.newArrayList("d3"));
 
   static {
     SCHEMA_BUILDER = new 
Schema.SchemaBuilder().addSingleValueDimension("col1", 
FieldSpec.DataType.STRING, "")
@@ -72,7 +75,7 @@ public class QueryEnvironmentTestUtils {
 
   public static TableCache mockTableCache() {
     TableCache mock = mock(TableCache.class);
-    when(mock.getTableNameMap()).thenReturn(ImmutableMap.of("a", "a", "b", 
"b", "c", "c",
+    when(mock.getTableNameMap()).thenReturn(ImmutableMap.of("a_REALTIME", "a", 
"b_REALTIME", "b", "c_REALTIME", "c",
         "d_OFFLINE", "d", "d_REALTIME", "d"));
     
when(mock.getSchema("a")).thenReturn(SCHEMA_BUILDER.setSchemaName("a").build());
     
when(mock.getSchema("b")).thenReturn(SCHEMA_BUILDER.setSchemaName("b").build());
@@ -108,7 +111,8 @@ public class QueryEnvironmentTestUtils {
     // hybrid table
     RoutingTable rtDOffline = mock(RoutingTable.class);
     RoutingTable rtDRealtime = mock(RoutingTable.class);
-    
when(rtDOffline.getServerInstanceToSegmentsMap()).thenReturn(ImmutableMap.of(host1,
 SERVER1_SEGMENTS.get("d_O")));
+    when(rtDOffline.getServerInstanceToSegmentsMap()).thenReturn(
+        ImmutableMap.of(host1, SERVER1_SEGMENTS.get("d_O"), host2, 
SERVER2_SEGMENTS.get("d_O")));
     
when(rtDRealtime.getServerInstanceToSegmentsMap()).thenReturn(ImmutableMap.of(host2,
 SERVER2_SEGMENTS.get("d_R")));
     Map<String, RoutingTable> mockRoutingTableMap = ImmutableMap.of("a", rtA, 
"b", rtB, "c", rtC,
         "d_OFFLINE", rtDOffline, "d_REALTIME", rtDRealtime);
@@ -121,6 +125,11 @@ public class QueryEnvironmentTestUtils {
           
mockRoutingTableMap.get(TableNameBuilder.extractRawTableName(tableName)));
     });
     
when(mock.getEnabledServerInstanceMap()).thenReturn(ImmutableMap.of(server1, 
host1, server2, host2));
+    when(mock.getTimeBoundaryInfo(anyString())).thenAnswer(invocation -> {
+      String offlineTableName = invocation.getArgument(0);
+      return "d_OFFLINE".equals(offlineTableName) ? new TimeBoundaryInfo("ts",
+          String.valueOf(System.currentTimeMillis() - 
TimeUnit.DAYS.toMillis(1))) : null;
+    });
     return mock;
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index bf53684d32..9419195a50 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.runtime;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -41,6 +42,7 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
 import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
@@ -48,12 +50,15 @@ import 
org.apache.pinot.query.runtime.utils.ServerRequestUtils;
 import org.apache.pinot.query.service.QueryConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * {@link QueryRunner} accepts a {@link DistributedStagePlan} and runs it.
  */
 public class QueryRunner {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryRunner.class);
   // This is a temporary before merging the 2 type of executor.
   private ServerQueryExecutorV1Impl _serverExecutor;
   private WorkerQueryExecutor _workerExecutor;
@@ -99,42 +104,51 @@ public class QueryRunner {
       // TODO: make server query request return via mailbox, this is a hack to 
gather the non-streaming data table
       // and package it here for return. But we should really use a 
MailboxSendOperator directly put into the
       // server executor.
-      ServerQueryRequest serverQueryRequest =
+      List<ServerQueryRequest> serverQueryRequests =
           ServerRequestUtils.constructServerQueryRequest(distributedStagePlan, 
requestMetadataMap);
 
       // send the data table via mailbox in one-off fashion (e.g. no 
block-level split, one data table/partition key)
-      BaseDataBlock dataBlock;
-      try {
-        DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, 
executorService, null);
-        if (!dataTable.getExceptions().isEmpty()) {
-          // if contains exception, directly return a metadata block with the 
exceptions.
-          dataBlock = 
DataBlockUtils.getErrorDataBlock(dataTable.getExceptions());
-        } else {
-          // this works because default DataTableImplV3 will have a version 
number at beginning:
-          // the new DataBlock encodes lower 16 bits as version and upper 16 
bits as type (ROW, COLUMNAR, METADATA)
-          dataBlock = 
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
-        }
-      } catch (Exception e) {
-        dataBlock = DataBlockUtils.getErrorDataBlock(e);
+      List<BaseDataBlock> serverQueryResults = new 
ArrayList<>(serverQueryRequests.size());
+      for (ServerQueryRequest request : serverQueryRequests) {
+        serverQueryResults.add(processServerQuery(request, executorService));
       }
 
       MailboxSendNode sendNode = (MailboxSendNode) 
distributedStagePlan.getStageRoot();
       StageMetadata receivingStageMetadata = 
distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId());
       MailboxSendOperator mailboxSendOperator =
           new MailboxSendOperator(_mailboxService, sendNode.getDataSchema(),
-              new LeafStageTransferableBlockOperator(dataBlock, 
sendNode.getDataSchema()),
+              new LeafStageTransferableBlockOperator(serverQueryResults, 
sendNode.getDataSchema()),
               receivingStageMetadata.getServerInstances(), 
sendNode.getExchangeType(),
-              sendNode.getPartitionKeySelector(), _hostname, _port, 
serverQueryRequest.getRequestId(),
+              sendNode.getPartitionKeySelector(), _hostname, _port, 
serverQueryRequests.get(0).getRequestId(),
               sendNode.getStageId());
-      mailboxSendOperator.nextBlock();
-      if (dataBlock.getExceptions().isEmpty()) {
-        mailboxSendOperator.nextBlock();
+      int blockCounter = 0;
+      while 
(!TransferableBlockUtils.isEndOfStream(mailboxSendOperator.nextBlock())) {
+        LOGGER.debug("Acquired transferable block: {}", blockCounter++);
       }
     } else {
       _workerExecutor.processQuery(distributedStagePlan, requestMetadataMap, 
executorService);
     }
   }
 
+  private BaseDataBlock processServerQuery(ServerQueryRequest 
serverQueryRequest,
+      ExecutorService executorService) {
+    BaseDataBlock dataBlock;
+    try {
+      DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, 
executorService, null);
+      if (!dataTable.getExceptions().isEmpty()) {
+        // if contains exception, directly return a metadata block with the 
exceptions.
+        dataBlock = 
DataBlockUtils.getErrorDataBlock(dataTable.getExceptions());
+      } else {
+        // this works because default DataTableImplV3 will have a version 
number at beginning:
+        // the new DataBlock encodes lower 16 bits as version and upper 16 
bits as type (ROW, COLUMNAR, METADATA)
+        dataBlock = 
DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes()));
+      }
+    } catch (Exception e) {
+      dataBlock = DataBlockUtils.getErrorDataBlock(e);
+    }
+    return dataBlock;
+  }
+
   /**
    * Leaf-stage transfer block opreator is used to wrap around the leaf stage 
process results. They are passed to the
    * Pinot server to execute query thus only one {@link DataTable} were 
returned. However, to conform with the
@@ -149,17 +163,17 @@ public class QueryRunner {
   private static class LeafStageTransferableBlockOperator extends 
BaseOperator<TransferableBlock> {
     private static final String EXPLAIN_NAME = "LEAF_STAGE_TRANSFER_OPERATOR";
 
-    private final MetadataBlock _endOfStreamBlock;
-    private final BaseDataBlock _baseDataBlock;
+    private final BaseDataBlock _errorBlock;
+    private final List<BaseDataBlock> _baseDataBlocks;
     private final DataSchema _dataSchema;
     private boolean _hasTransferred;
+    private int _currentIndex;
 
-    private LeafStageTransferableBlockOperator(BaseDataBlock baseDataBlock, 
DataSchema dataSchema) {
-      _baseDataBlock = baseDataBlock;
+    private LeafStageTransferableBlockOperator(List<BaseDataBlock> 
baseDataBlocks, DataSchema dataSchema) {
+      _baseDataBlocks = baseDataBlocks;
       _dataSchema = dataSchema;
-      _endOfStreamBlock = baseDataBlock.getExceptions().isEmpty()
-          ? DataBlockUtils.getEndOfStreamDataBlock(dataSchema) : null;
-      _hasTransferred = false;
+      _errorBlock = baseDataBlocks.stream().filter(e -> 
!e.getExceptions().isEmpty()).findFirst().orElse(null);
+      _currentIndex = 0;
     }
 
     @Override
@@ -175,11 +189,19 @@ public class QueryRunner {
 
     @Override
     protected TransferableBlock getNextBlock() {
-      if (!_hasTransferred) {
-        _hasTransferred = true;
-        return new TransferableBlock(_baseDataBlock);
+      if (_currentIndex < 0) {
+        throw new RuntimeException("Leaf transfer terminated. next block 
should no longer be called.");
+      }
+      if (_errorBlock != null) {
+        _currentIndex = -1;
+        return new TransferableBlock(_errorBlock);
       } else {
-        return new TransferableBlock(_endOfStreamBlock);
+        if (_currentIndex < _baseDataBlocks.size()) {
+          return new TransferableBlock(_baseDataBlocks.get(_currentIndex++));
+        } else {
+          _currentIndex = -1;
+          return new 
TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock(_dataSchema));
+        }
       }
     }
   }
@@ -188,7 +210,7 @@ public class QueryRunner {
     int stageId = distributedStagePlan.getStageId();
     ServerInstance serverInstance = distributedStagePlan.getServerInstance();
     StageMetadata stageMetadata = 
distributedStagePlan.getMetadataMap().get(stageId);
-    List<String> segments = 
stageMetadata.getServerInstanceToSegmentsMap().get(serverInstance);
+    Map<String, List<String>> segments = 
stageMetadata.getServerInstanceToSegmentsMap().get(serverInstance);
     return segments != null && segments.size() > 0;
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
index 49c2d62dcb..776e9f4e22 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.AbstractStageNode;
@@ -80,12 +81,25 @@ public class QueryPlanSerDeUtils {
 
   private static StageMetadata fromWorkerStageMetadata(Worker.StageMetadata 
workerStageMetadata) {
     StageMetadata stageMetadata = new StageMetadata();
+    // scanned table
     
stageMetadata.getScannedTables().addAll(workerStageMetadata.getDataSourcesList());
+    // server instance to table-segments mapping
     for (String serverInstanceString : workerStageMetadata.getInstancesList()) 
{
       
stageMetadata.getServerInstances().add(stringToInstance(serverInstanceString));
     }
-    for (Map.Entry<String, Worker.SegmentList> e : 
workerStageMetadata.getInstanceToSegmentListMap().entrySet()) {
-      
stageMetadata.getServerInstanceToSegmentsMap().put(stringToInstance(e.getKey()),
 e.getValue().getSegmentsList());
+    for (Map.Entry<String, Worker.SegmentMap> instanceEntry
+        : workerStageMetadata.getInstanceToSegmentMapMap().entrySet()) {
+      Map<String, List<String>> tableToSegmentMap = new HashMap<>();
+      for (Map.Entry<String, Worker.SegmentList> tableEntry
+          : 
instanceEntry.getValue().getTableTypeToSegmentListMap().entrySet()) {
+        tableToSegmentMap.put(tableEntry.getKey(), 
tableEntry.getValue().getSegmentsList());
+      }
+      
stageMetadata.getServerInstanceToSegmentsMap().put(stringToInstance(instanceEntry.getKey()),
 tableToSegmentMap);
+    }
+    // time boundary info
+    if (!workerStageMetadata.getTimeColumn().isEmpty()) {
+      stageMetadata.setTimeBoundaryInfo(new 
TimeBoundaryInfo(workerStageMetadata.getTimeColumn(),
+          workerStageMetadata.getTimeValue()));
     }
     return stageMetadata;
   }
@@ -100,13 +114,26 @@ public class QueryPlanSerDeUtils {
 
   private static Worker.StageMetadata toWorkerStageMetadata(StageMetadata 
stageMetadata) {
     Worker.StageMetadata.Builder builder = Worker.StageMetadata.newBuilder();
+    // scanned table
     builder.addAllDataSources(stageMetadata.getScannedTables());
+    // server instance to table-segments mapping
     for (ServerInstance serverInstance : stageMetadata.getServerInstances()) {
       builder.addInstances(instanceToString(serverInstance));
     }
-    for (Map.Entry<ServerInstance, List<String>> e : 
stageMetadata.getServerInstanceToSegmentsMap().entrySet()) {
-      builder.putInstanceToSegmentList(instanceToString(e.getKey()),
-          
Worker.SegmentList.newBuilder().addAllSegments(e.getValue()).build());
+    for (Map.Entry<ServerInstance, Map<String, List<String>>> instanceEntry
+        : stageMetadata.getServerInstanceToSegmentsMap().entrySet()) {
+      Map<String, Worker.SegmentList> tableToSegmentMap = new HashMap<>();
+      for (Map.Entry<String, List<String>> tableEntry : 
instanceEntry.getValue().entrySet()) {
+        tableToSegmentMap.put(tableEntry.getKey(),
+            
Worker.SegmentList.newBuilder().addAllSegments(tableEntry.getValue()).build());
+      }
+      builder.putInstanceToSegmentMap(instanceToString(instanceEntry.getKey()),
+          
Worker.SegmentMap.newBuilder().putAllTableTypeToSegmentList(tableToSegmentMap).build());
+    }
+    // time boundary info
+    if (stageMetadata.getTimeBoundaryInfo() != null) {
+      
builder.setTimeColumn(stageMetadata.getTimeBoundaryInfo().getTimeColumn());
+      builder.setTimeValue(stageMetadata.getTimeBoundaryInfo().getTimeValue());
     }
     return builder.build();
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
index 07e02bdb8b..b14a35e1a5 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/utils/ServerRequestUtils.java
@@ -18,17 +18,23 @@
  */
 package org.apache.pinot.query.runtime.utils;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.DataSource;
+import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.request.PinotQuery;
 import org.apache.pinot.common.request.QuerySource;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
 import org.apache.pinot.query.parser.CalciteRexExpressionParser;
+import org.apache.pinot.query.planner.StageMetadata;
 import org.apache.pinot.query.planner.stage.AggregateNode;
 import org.apache.pinot.query.planner.stage.FilterNode;
 import org.apache.pinot.query.planner.stage.MailboxSendNode;
@@ -37,7 +43,10 @@ import org.apache.pinot.query.planner.stage.SortNode;
 import org.apache.pinot.query.planner.stage.StageNode;
 import org.apache.pinot.query.planner.stage.TableScanNode;
 import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.sql.FilterKind;
 
 
 /**
@@ -54,24 +63,45 @@ public class ServerRequestUtils {
   }
 
   // TODO: This is a hack, make an actual ServerQueryRequest converter.
-  public static ServerQueryRequest 
constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
+  public static List<ServerQueryRequest> 
constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
       Map<String, String> requestMetadataMap) {
+    StageMetadata stageMetadata = 
distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId());
+    Map<String, List<String>> tableToSegmentListMap = 
stageMetadata.getServerInstanceToSegmentsMap()
+        .get(distributedStagePlan.getServerInstance());
+    List<ServerQueryRequest> requests = new ArrayList<>();
+    for (Map.Entry<String, List<String>> tableEntry : 
tableToSegmentListMap.entrySet()) {
+      String tableType = tableEntry.getKey();
+      if (TableType.OFFLINE.name().equals(tableType)) {
+        requests.add(constructServerQueryRequest(distributedStagePlan, 
requestMetadataMap,
+            stageMetadata.getTimeBoundaryInfo(), TableType.OFFLINE, 
tableEntry.getValue()));
+      } else if (TableType.REALTIME.name().equals(tableType)) {
+        requests.add(constructServerQueryRequest(distributedStagePlan, 
requestMetadataMap,
+            stageMetadata.getTimeBoundaryInfo(), TableType.REALTIME, 
tableEntry.getValue()));
+      } else {
+        throw new IllegalArgumentException("Unsupported table type key: " + 
tableType);
+      }
+    }
+    return requests;
+  }
+
+  public static ServerQueryRequest 
constructServerQueryRequest(DistributedStagePlan distributedStagePlan,
+      Map<String, String> requestMetadataMap, TimeBoundaryInfo 
timeBoundaryInfo, TableType tableType,
+      List<String> segmentList) {
     InstanceRequest instanceRequest = new InstanceRequest();
     
instanceRequest.setRequestId(Long.parseLong(requestMetadataMap.get("REQUEST_ID")));
     instanceRequest.setBrokerId("unknown");
     instanceRequest.setEnableTrace(false);
-    instanceRequest.setSearchSegments(
-        
distributedStagePlan.getMetadataMap().get(distributedStagePlan.getStageId()).getServerInstanceToSegmentsMap()
-            .get(distributedStagePlan.getServerInstance()));
-    instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan));
+    instanceRequest.setSearchSegments(segmentList);
+    instanceRequest.setQuery(constructBrokerRequest(distributedStagePlan, 
tableType, timeBoundaryInfo));
     return new ServerQueryRequest(instanceRequest, new 
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()),
         System.currentTimeMillis());
   }
 
   // TODO: this is a hack, create a broker request object should not be needed 
because we rewrite the entire
   // query into stages already.
-  public static BrokerRequest constructBrokerRequest(DistributedStagePlan 
distributedStagePlan) {
-    PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan);
+  public static BrokerRequest constructBrokerRequest(DistributedStagePlan 
distributedStagePlan, TableType tableType,
+      TimeBoundaryInfo timeBoundaryInfo) {
+    PinotQuery pinotQuery = constructPinotQuery(distributedStagePlan, 
tableType, timeBoundaryInfo);
     BrokerRequest brokerRequest = new BrokerRequest();
     brokerRequest.setPinotQuery(pinotQuery);
     // Set table name in broker request because it is used for access control, 
query routing etc.
@@ -84,23 +114,29 @@ public class ServerRequestUtils {
     return brokerRequest;
   }
 
-  public static PinotQuery constructPinotQuery(DistributedStagePlan 
distributedStagePlan) {
+  public static PinotQuery constructPinotQuery(DistributedStagePlan 
distributedStagePlan, TableType tableType,
+      TimeBoundaryInfo timeBoundaryInfo) {
     PinotQuery pinotQuery = new PinotQuery();
     pinotQuery.setLimit(DEFAULT_LEAF_NODE_LIMIT);
     pinotQuery.setExplain(false);
-    walkStageTree(distributedStagePlan.getStageRoot(), pinotQuery);
+    walkStageTree(distributedStagePlan.getStageRoot(), pinotQuery, tableType);
+    if (timeBoundaryInfo != null) {
+      attachTimeBoundary(pinotQuery, timeBoundaryInfo, tableType == 
TableType.OFFLINE);
+    }
     return pinotQuery;
   }
 
-  private static void walkStageTree(StageNode node, PinotQuery pinotQuery) {
+  private static void walkStageTree(StageNode node, PinotQuery pinotQuery, 
TableType tableType) {
     // this walkStageTree should only be a sequential walk.
     for (StageNode child : node.getInputs()) {
-      walkStageTree(child, pinotQuery);
+      walkStageTree(child, pinotQuery, tableType);
     }
     if (node instanceof TableScanNode) {
       TableScanNode tableScanNode = (TableScanNode) node;
       DataSource dataSource = new DataSource();
-      dataSource.setTableName(tableScanNode.getTableName());
+      String tableNameWithType = TableNameBuilder.forType(tableType)
+          
.tableNameWithType(TableNameBuilder.extractRawTableName(tableScanNode.getTableName()));
+      dataSource.setTableName(tableNameWithType);
       pinotQuery.setDataSource(dataSource);
       
pinotQuery.setSelectList(tableScanNode.getTableScanColumns().stream().map(RequestUtils::getIdentifierExpression)
           .collect(Collectors.toList()));
@@ -135,4 +171,26 @@ public class ServerRequestUtils {
       throw new UnsupportedOperationException("Unsupported logical plan node: 
" + node);
     }
   }
+
+  /**
+   * Helper method to attach the time boundary to the given PinotQuery.
+   */
+  private static void attachTimeBoundary(PinotQuery pinotQuery, 
TimeBoundaryInfo timeBoundaryInfo,
+      boolean isOfflineRequest) {
+    String timeColumn = timeBoundaryInfo.getTimeColumn();
+    String timeValue = timeBoundaryInfo.getTimeValue();
+    Expression timeFilterExpression = RequestUtils.getFunctionExpression(
+        isOfflineRequest ? FilterKind.LESS_THAN_OR_EQUAL.name() : 
FilterKind.GREATER_THAN.name());
+    timeFilterExpression.getFunctionCall().setOperands(
+        Arrays.asList(RequestUtils.getIdentifierExpression(timeColumn), 
RequestUtils.getLiteralExpression(timeValue)));
+
+    Expression filterExpression = pinotQuery.getFilterExpression();
+    if (filterExpression != null) {
+      Expression andFilterExpression = 
RequestUtils.getFunctionExpression(FilterKind.AND.name());
+      
andFilterExpression.getFunctionCall().setOperands(Arrays.asList(filterExpression,
 timeFilterExpression));
+      pinotQuery.setFilterExpression(andFilterExpression);
+    } else {
+      pinotQuery.setFilterExpression(timeFilterExpression);
+    }
+  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 22f4eed5ca..fa564e4bea 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -144,7 +145,8 @@ public class QueryServerEnclosure {
       row.putValue("col1", STRING_FIELD_LIST[i % STRING_FIELD_LIST.length]);
       row.putValue("col2", STRING_FIELD_LIST[i % (STRING_FIELD_LIST.length - 
2)]);
       row.putValue("col3", INT_FIELD_LIST[i % INT_FIELD_LIST.length]);
-      row.putValue("ts", System.currentTimeMillis());
+      row.putValue("ts", tableName.endsWith("_O")
+          ? System.currentTimeMillis() - TimeUnit.DAYS.toMillis(2) : 
System.currentTimeMillis());
       rows.add(row);
     }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
index 3b5a94d560..de70190b7f 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerExceptionTest.java
@@ -35,7 +35,7 @@ import org.testng.annotations.Test;
 public class QueryRunnerExceptionTest extends QueryRunnerTestBase {
 
   @Test(dataProvider = "testDataWithSqlExecutionExceptions")
-  public void testSqlWithFinalRowCountChecker(String sql, String exeptionMsg) {
+  public void testSqlWithExceptionMsgChecker(String sql, String exceptionMsg) {
     QueryPlan queryPlan = _queryEnvironment.planQuery(sql);
     Map<String, String> requestMetadataMap =
         ImmutableMap.of("REQUEST_ID", 
String.valueOf(RANDOM_REQUEST_ID_GEN.nextLong()));
@@ -61,7 +61,7 @@ public class QueryRunnerExceptionTest extends 
QueryRunnerTestBase {
       QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator);
     } catch (RuntimeException rte) {
       Assert.assertTrue(rte.getMessage().contains("Received error query 
execution result block"));
-      Assert.assertTrue(rte.getMessage().contains(exeptionMsg));
+      Assert.assertTrue(rte.getMessage().contains(exceptionMsg));
     }
   }
 
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
index 48823ea18d..670710590d 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java
@@ -72,6 +72,9 @@ public class QueryRunnerTest extends QueryRunnerTestBase {
         // No match filter
         new Object[]{"SELECT * FROM b WHERE col3 < 0", 0},
 
+        // Hybrid table
+        new Object[]{"SELECT * FROM d", 15},
+
         // Specifically table A has 15 rows (10 on server1 and 5 on server2) 
and table B has 5 rows (all on server1),
         // thus the final JOIN result will be 15 x 1 = 15.
         // Next join with table C which has (5 on server1 and 10 on server2), 
since data is identical. each of the row
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index 22cf24e60f..9842501f00 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -46,9 +46,11 @@ import static 
org.apache.pinot.core.query.selection.SelectionOperatorUtils.extra
 public class QueryRunnerTestBase {
   private static final File INDEX_DIR_S1_A = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableA");
   private static final File INDEX_DIR_S1_B = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableB");
-  private static final File INDEX_DIR_S2_A = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableA");
   private static final File INDEX_DIR_S1_C = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableC");
+  private static final File INDEX_DIR_S1_D = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server1_tableD");
+  private static final File INDEX_DIR_S2_A = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableA");
   private static final File INDEX_DIR_S2_C = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableC");
+  private static final File INDEX_DIR_S2_D = new 
File(FileUtils.getTempDirectory(), "QueryRunnerTest_server2_tableD");
 
   protected static final Random RANDOM_REQUEST_ID_GEN = new Random();
 
@@ -73,11 +75,12 @@ public class QueryRunnerTestBase {
   public void setUp()
       throws Exception {
     DataTableFactory.setDataTableVersion(DataTableFactory.VERSION_4);
-    QueryServerEnclosure server1 = new 
QueryServerEnclosure(Lists.newArrayList("a", "b", "c"),
-        ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c", 
INDEX_DIR_S1_C),
+    QueryServerEnclosure server1 = new 
QueryServerEnclosure(Lists.newArrayList("a", "b", "c", "d_O"),
+        ImmutableMap.of("a", INDEX_DIR_S1_A, "b", INDEX_DIR_S1_B, "c", 
INDEX_DIR_S1_C, "d_O", INDEX_DIR_S1_D),
         QueryEnvironmentTestUtils.SERVER1_SEGMENTS);
-    QueryServerEnclosure server2 = new 
QueryServerEnclosure(Lists.newArrayList("a", "c"),
-        ImmutableMap.of("a", INDEX_DIR_S2_A, "c", INDEX_DIR_S2_C), 
QueryEnvironmentTestUtils.SERVER2_SEGMENTS);
+    QueryServerEnclosure server2 = new 
QueryServerEnclosure(Lists.newArrayList("a", "c", "d_R", "d_O"),
+        ImmutableMap.of("a", INDEX_DIR_S2_A, "c", INDEX_DIR_S2_C, "d_R", 
INDEX_DIR_S2_D, "d_O", INDEX_DIR_S1_D),
+        QueryEnvironmentTestUtils.SERVER2_SEGMENTS);
 
     _reducerGrpcPort = QueryEnvironmentTestUtils.getAvailablePort();
     _reducerHostname = String.format("Broker_%s", 
QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to