This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ada54f1  [QueryContext] Refactor PlanMaker to use QueryContext (#5568)
ada54f1 is described below

commit ada54f11ac2515340cfc5c7f8ebcf66ec3069be3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jun 17 16:03:52 2020 -0700

    [QueryContext] Refactor PlanMaker to use QueryContext (#5568)
    
    Also addressed the TODO to directly use List<IndexSegment> instead of 
List<SegmentDataManager> for the instance level Plan
    Modified and simplied tests to apply the interface change
---
 .../core/plan/maker/InstancePlanMakerImplV2.java   | 153 ++++-----
 .../apache/pinot/core/plan/maker/PlanMaker.java    |  27 +-
 .../query/executor/ServerQueryExecutorV1Impl.java  |  10 +-
 ...adataAndDictionaryAggregationPlanMakerTest.java |  30 +-
 .../pinot/queries/BaseMultiValueQueriesTest.java   |  22 +-
 .../org/apache/pinot/queries/BaseQueriesTest.java  | 188 +++++------
 .../pinot/queries/BaseSingleValueQueriesTest.java  |  25 +-
 .../queries/DistinctCountThetaSketchTest.java      |  39 +--
 .../apache/pinot/queries/DistinctQueriesTest.java  |  42 ++-
 .../apache/pinot/queries/FastHllQueriesTest.java   |  11 +-
 .../queries/PercentileTDigestQueriesTest.java      |  32 +-
 .../RangePredicateWithSortedInvertedIndexTest.java | 249 +++++++-------
 .../queries/SelectionOnlyEarlyTerminationTest.java |   6 +-
 .../pinot/queries/SerializedBytesQueriesTest.java  |  28 +-
 .../pinot/queries/TextSearchQueriesTest.java       | 115 +++----
 .../apache/pinot/queries/TransformQueriesTest.java | 359 ++++++++-------------
 16 files changed, 561 insertions(+), 775 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 2103115..4b388a3 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -23,10 +23,7 @@ import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import org.apache.pinot.common.function.AggregationFunctionType;
-import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.plan.AggregationGroupByOrderByPlanNode;
 import org.apache.pinot.core.plan.AggregationGroupByPlanNode;
@@ -39,8 +36,11 @@ import 
org.apache.pinot.core.plan.MetadataBasedAggregationPlanNode;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.core.plan.SelectionPlanNode;
-import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.config.QueryExecutorConfig;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
 import org.apache.pinot.core.util.QueryOptions;
 import org.slf4j.Logger;
@@ -95,10 +95,26 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   }
 
   @Override
-  public PlanNode makeInnerSegmentPlan(IndexSegment indexSegment, 
BrokerRequest brokerRequest) {
-    if (brokerRequest.isSetAggregationsInfo()) {
-      if (brokerRequest.isSetGroupBy()) {
-        QueryOptions queryOptions = new 
QueryOptions(brokerRequest.getQueryOptions());
+  public Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext 
queryContext,
+      ExecutorService executorService, long timeOutMs) {
+    List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
+    for (IndexSegment indexSegment : indexSegments) {
+      planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
+    }
+    CombinePlanNode combinePlanNode =
+        new CombinePlanNode(planNodes, queryContext.getBrokerRequest(), 
executorService, timeOutMs, _numGroupsLimit);
+    return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
+  }
+
+  @Override
+  public PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext 
queryContext) {
+    BrokerRequest brokerRequest = queryContext.getBrokerRequest();
+    if (QueryContextUtils.isAggregationQuery(queryContext)) {
+      // Aggregation query
+      List<ExpressionContext> groupByExpressions = 
queryContext.getGroupByExpressions();
+      if (groupByExpressions != null) {
+        // Aggregation group-by query
+        QueryOptions queryOptions = new 
QueryOptions(queryContext.getQueryOptions());
         // new Combine operator only when GROUP_BY_MODE explicitly set to SQL
         if (queryOptions.isGroupByModeSQL()) {
           return new AggregationGroupByOrderByPlanNode(indexSegment, 
brokerRequest, _maxInitialResultHolderCapacity,
@@ -107,109 +123,62 @@ public class InstancePlanMakerImplV2 implements 
PlanMaker {
         return new AggregationGroupByPlanNode(indexSegment, brokerRequest, 
_maxInitialResultHolderCapacity,
             _numGroupsLimit);
       } else {
-        if (isFitForMetadataBasedPlan(brokerRequest, indexSegment)) {
-          return new MetadataBasedAggregationPlanNode(indexSegment, 
brokerRequest);
-        } else if (isFitForDictionaryBasedPlan(brokerRequest, indexSegment)) {
-          return new DictionaryBasedAggregationPlanNode(indexSegment, 
brokerRequest);
-        } else {
-          return new AggregationPlanNode(indexSegment, brokerRequest);
+        // Aggregation only query
+        if (queryContext.getFilter() == null) {
+          if (isFitForMetadataBasedPlan(queryContext)) {
+            return new MetadataBasedAggregationPlanNode(indexSegment, 
brokerRequest);
+          } else if (isFitForDictionaryBasedPlan(queryContext, indexSegment)) {
+            return new DictionaryBasedAggregationPlanNode(indexSegment, 
brokerRequest);
+          }
         }
+        return new AggregationPlanNode(indexSegment, brokerRequest);
       }
-    }
-    if (brokerRequest.isSetSelections()) {
+    } else {
+      // Selection query
       return new SelectionPlanNode(indexSegment, brokerRequest);
     }
-    throw new UnsupportedOperationException("The query contains no aggregation 
or selection.");
-  }
-
-  @Override
-  public Plan makeInterSegmentPlan(List<SegmentDataManager> 
segmentDataManagers, BrokerRequest brokerRequest,
-      ExecutorService executorService, long timeOutMs) {
-    // TODO: pass in List<IndexSegment> directly.
-    List<IndexSegment> indexSegments = new 
ArrayList<>(segmentDataManagers.size());
-    for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-      indexSegments.add(segmentDataManager.getSegment());
-    }
-
-    List<PlanNode> planNodes = new ArrayList<>();
-    for (IndexSegment indexSegment : indexSegments) {
-      planNodes.add(makeInnerSegmentPlan(indexSegment, brokerRequest));
-    }
-    CombinePlanNode combinePlanNode =
-        new CombinePlanNode(planNodes, brokerRequest, executorService, 
timeOutMs, _numGroupsLimit);
-
-    return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
   }
 
   /**
-   * Helper method to identify if query is fit to be be served purely based on 
metadata.
-   * Currently count queries without any filters are supported.
-   * The code for supporting max and min is also in place, but disabled
-   * It would have worked only for time columns and offline and non star tree 
cases.
-   *
-   * @param brokerRequest Broker request
-   * @param indexSegment
-   * @return True if query can be served using metadata, false otherwise.
+   * Returns {@code true} if the given aggregation-only without filter 
QueryContext can be solved with segment metadata,
+   * {@code false} otherwise.
+   * <p>Aggregations supported: COUNT
    */
-  public static boolean isFitForMetadataBasedPlan(BrokerRequest brokerRequest, 
IndexSegment indexSegment) {
-    if (brokerRequest.getFilterQuery() != null || 
brokerRequest.isSetGroupBy()) {
-      return false;
-    }
-
-    List<AggregationInfo> aggregationsInfo = 
brokerRequest.getAggregationsInfo();
-    if (aggregationsInfo == null) {
-      return false;
-    }
-    for (AggregationInfo aggInfo : aggregationsInfo) {
-      if (!isMetadataBasedAggregationFunction(aggInfo, indexSegment)) {
+  @VisibleForTesting
+  static boolean isFitForMetadataBasedPlan(QueryContext queryContext) {
+    List<ExpressionContext> selectExpressions = 
queryContext.getSelectExpressions();
+    for (ExpressionContext expression : selectExpressions) {
+      if (!expression.getFunction().getFunctionName().equals("count")) {
         return false;
       }
     }
     return true;
   }
 
-  private static boolean isMetadataBasedAggregationFunction(AggregationInfo 
aggregationInfo,
-      IndexSegment indexSegment) {
-    return 
AggregationFunctionType.getAggregationFunctionType(aggregationInfo.getAggregationType())
-        == AggregationFunctionType.COUNT;
-  }
-
   /**
-   * Helper method to identify if query is fit to be be served purely based on 
dictionary.
-   * It can be served through dictionary only for min, max, minmaxrange 
queries as of now,
-   * and if a dictionary is present for the column
-   * @param brokerRequest Broker request
-   * @param indexSegment
-   * @return True if query can be served using dictionary, false otherwise.
+   * Returns {@code true} if the given aggregation-only without filter 
QueryContext can be solved with dictionary,
+   * {@code false} otherwise.
+   * <p>Aggregations supported: MIN, MAX, MINMAXRANGE
    */
-  public static boolean isFitForDictionaryBasedPlan(BrokerRequest 
brokerRequest, IndexSegment indexSegment) {
-    if ((brokerRequest.getFilterQuery() != null) || 
brokerRequest.isSetGroupBy()) {
-      return false;
-    }
-    List<AggregationInfo> aggregationsInfo = 
brokerRequest.getAggregationsInfo();
-    if (aggregationsInfo == null) {
-      return false;
-    }
-    for (AggregationInfo aggregationInfo : aggregationsInfo) {
-      if (!isDictionaryBasedAggregationFunction(aggregationInfo, 
indexSegment)) {
+  @VisibleForTesting
+  static boolean isFitForDictionaryBasedPlan(QueryContext queryContext, 
IndexSegment indexSegment) {
+    List<ExpressionContext> selectExpressions = 
queryContext.getSelectExpressions();
+    for (ExpressionContext expression : selectExpressions) {
+      FunctionContext function = expression.getFunction();
+      String functionName = function.getFunctionName();
+      if (!functionName.equals("min") && !functionName.equals("max") && 
!functionName.equals("minmaxrange")) {
         return false;
       }
-    }
-    return true;
-  }
-
-  private static boolean isDictionaryBasedAggregationFunction(AggregationInfo 
aggregationInfo,
-      IndexSegment indexSegment) {
-    AggregationFunctionType functionType =
-        
AggregationFunctionType.getAggregationFunctionType(aggregationInfo.getAggregationType());
-    if (functionType
-        .isOfType(AggregationFunctionType.MIN, AggregationFunctionType.MAX, 
AggregationFunctionType.MINMAXRANGE)) {
-      String column = 
AggregationFunctionUtils.getArguments(aggregationInfo).get(0);
-      if (indexSegment.getColumnNames().contains(column)) {
-        Dictionary dictionary = 
indexSegment.getDataSource(column).getDictionary();
-        return dictionary != null && dictionary.isSorted();
+      ExpressionContext argument = function.getArguments().get(0);
+      if (argument.getType() != ExpressionContext.Type.IDENTIFIER) {
+        return false;
+      }
+      String column = argument.getIdentifier();
+      Dictionary dictionary = 
indexSegment.getDataSource(column).getDictionary();
+      if (dictionary == null || !dictionary.isSorted()) {
+        return false;
       }
     }
-    return false;
+    return true;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java 
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
index 9170616..553a6d8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
@@ -20,36 +20,27 @@ package org.apache.pinot.core.plan.maker;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.PlanNode;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.spi.annotations.InterfaceAudience;
 
 
 /**
- * The <code>PlanMaker</code> provides interfaces to make segment level and 
instance level execution plan.
+ * The {@code PlanMaker} makes logical execution plan for the queries.
  */
[email protected]
 public interface PlanMaker {
 
   /**
-   * Make segment level {@link PlanNode} which contains execution plan on only 
one segment.
-   *
-   * @param indexSegment index segment.
-   * @param brokerRequest broker request.
-   * @return segment level plan node.
+   * Returns an instance level {@link Plan} which contains the logical 
execution plan for multiple segments.
    */
-  PlanNode makeInnerSegmentPlan(IndexSegment indexSegment, BrokerRequest 
brokerRequest);
+  Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext 
queryContext, ExecutorService executorService,
+      long timeoutMs);
 
   /**
-   * Make instance level {@link Plan} which contains execution plan on 
multiple segments.
-   *
-   * @param segmentDataManagers list of segment data manager.
-   * @param brokerRequest broker request.
-   * @param executorService executor service.
-   * @param timeOutMs time out in milliseconds.
-   * @return instance level plan.
+   * Returns a segment level {@link PlanNode} which contains the logical 
execution plan for one segment.
    */
-  Plan makeInterSegmentPlan(List<SegmentDataManager> segmentDataManagers, 
BrokerRequest brokerRequest,
-      ExecutorService executorService, long timeOutMs);
+  PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext 
queryContext);
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index b420f5d..2f8be38 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.executor;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -204,9 +205,12 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
         metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
       } else {
         TimerContext.Timer planBuildTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
-        Plan globalQueryPlan = _planMaker
-            .makeInterSegmentPlan(segmentDataManagers, 
queryContext.getBrokerRequest(), executorService,
-                remainingTimeMs);
+        List<IndexSegment> indexSegments = new 
ArrayList<>(numSegmentsMatchedAfterPruning);
+        for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+          indexSegments.add(segmentDataManager.getSegment());
+        }
+        Plan globalQueryPlan =
+            _planMaker.makeInstancePlan(indexSegments, queryContext, 
executorService, remainingTimeMs);
         planBuildTimer.stopAndRecord();
 
         TimerContext.Timer planExecTimer = 
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 13d2088..591cba0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -36,6 +36,8 @@ import 
org.apache.pinot.core.plan.DictionaryBasedAggregationPlanNode;
 import org.apache.pinot.core.plan.MetadataBasedAggregationPlanNode;
 import org.apache.pinot.core.plan.PlanNode;
 import org.apache.pinot.core.plan.SelectionPlanNode;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
@@ -46,7 +48,6 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeClass;
@@ -54,6 +55,10 @@ import org.testng.annotations.BeforeTest;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
 
 public class MetadataAndDictionaryAggregationPlanMakerTest {
   private static final String AVRO_DATA = "data" + File.separator + 
"test_data-sv.avro";
@@ -72,7 +77,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
 
     // Get resource file path.
     URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
-    Assert.assertNotNull(resource);
+    assertNotNull(resource);
     String filePath = resource.getFile();
 
     // Build the segment schema.
@@ -128,8 +133,9 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
   @Test(dataProvider = "testPlanNodeMakerDataProvider")
   public void testInstancePlanMakerForMetadataAndDictionaryPlan(String query, 
Class<? extends PlanNode> planNodeClass) {
     BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest(query);
-    PlanNode plan = PLAN_MAKER.makeInnerSegmentPlan(_indexSegment, 
brokerRequest);
-    Assert.assertTrue(planNodeClass.isInstance(plan));
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    PlanNode plan = PLAN_MAKER.makeSegmentPlanNode(_indexSegment, 
queryContext);
+    assertTrue(planNodeClass.isInstance(plan));
   }
 
   @DataProvider(name = "testPlanNodeMakerDataProvider")
@@ -171,18 +177,15 @@ public class 
MetadataAndDictionaryAggregationPlanMakerTest {
   public void testIsFitFor(String query, IndexSegment indexSegment, boolean 
expectedIsFitForMetadata,
       boolean expectedIsFitForDictionary) {
     BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest(query);
-
-    boolean isFitForMetadataBasedPlan = 
InstancePlanMakerImplV2.isFitForMetadataBasedPlan(brokerRequest, indexSegment);
-    boolean isFitForDictionaryBasedPlan =
-        InstancePlanMakerImplV2.isFitForDictionaryBasedPlan(brokerRequest, 
indexSegment);
-    Assert.assertEquals(isFitForMetadataBasedPlan, expectedIsFitForMetadata);
-    Assert.assertEquals(isFitForDictionaryBasedPlan, 
expectedIsFitForDictionary);
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    
assertEquals(InstancePlanMakerImplV2.isFitForMetadataBasedPlan(queryContext), 
expectedIsFitForMetadata);
+    
assertEquals(InstancePlanMakerImplV2.isFitForDictionaryBasedPlan(queryContext, 
indexSegment),
+        expectedIsFitForDictionary);
   }
 
   @DataProvider(name = "isFitForPlanDataProvider")
   public Object[][] provideDataForIsFitChecks() {
     List<Object[]> entries = new ArrayList<>();
-    entries.add(new Object[]{"select * from testTable", _indexSegment, false, 
false});
     entries.add(
         new Object[]{"select count(*) from testTable", _indexSegment, true, 
false /* count* from metadata, even if star tree present */});
     entries.add(
@@ -192,11 +195,6 @@ public class MetadataAndDictionaryAggregationPlanMakerTest 
{
     entries.add(
         new Object[]{"select count(*),max(daysSinceEpoch) from testTable", 
_indexSegment, false, false /* count* and max(time) from metadata*/});
     entries.add(new Object[]{"select sum(column1) from testTable", 
_indexSegment, false, false});
-    entries.add(new Object[]{"select count(*) from testTable group by 
daysSinceEpoch", _indexSegment, false, false});
-    entries.add(new Object[]{"select count(*) from testTable where 
daysSinceEpoch > 1", _indexSegment, false, false});
-    entries.add(
-        new Object[]{"select max(column5) from testTable where daysSinceEpoch 
> 100", _indexSegment, false, false});
-    entries.add(new Object[]{"select column1 from testTable", _indexSegment, 
false, false});
     return entries.toArray(new Object[entries.size()][]);
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseMultiValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseMultiValueQueriesTest.java
index add44b5..e08b4f0 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseMultiValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseMultiValueQueriesTest.java
@@ -24,19 +24,17 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
@@ -76,7 +74,7 @@ public abstract class BaseMultiValueQueriesTest extends 
BaseQueriesTest {
 
   private IndexSegment _indexSegment;
   // Contains 2 identical index segments.
-  private List<SegmentDataManager> _segmentDataManagers;
+  private List<IndexSegment> _indexSegments;
 
   @BeforeTest
   public void buildSegment()
@@ -96,8 +94,7 @@ public abstract class BaseMultiValueQueriesTest extends 
BaseQueriesTest {
         .addMultiValueDimension("column7", FieldSpec.DataType.INT)
         .addSingleValueDimension("column8", 
FieldSpec.DataType.INT).addMetric("column9", FieldSpec.DataType.INT)
         .addMetric("column10", FieldSpec.DataType.INT)
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, 
TimeUnit.DAYS, "daysSinceEpoch"), null)
-        .build();
+        .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, 
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch").build();
 
@@ -125,8 +122,7 @@ public abstract class BaseMultiValueQueriesTest extends 
BaseQueriesTest {
       throws Exception {
     ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
     _indexSegment = immutableSegment;
-    _segmentDataManagers = Arrays
-        .asList(new ImmutableSegmentDataManager(immutableSegment), new 
ImmutableSegmentDataManager(immutableSegment));
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
   }
 
   @AfterClass
@@ -150,7 +146,7 @@ public abstract class BaseMultiValueQueriesTest extends 
BaseQueriesTest {
   }
 
   @Override
-  protected List<SegmentDataManager> getSegmentDataManagers() {
-    return _segmentDataManagers;
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index 6b012d3..07db200 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -23,17 +23,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.CommonConstants.Broker.Request;
 import org.apache.pinot.common.utils.CommonConstants.Server;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.plan.maker.PlanMaker;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.apache.pinot.spi.config.table.TableType;
@@ -53,183 +56,138 @@ public abstract class BaseQueriesTest {
 
   protected abstract IndexSegment getIndexSegment();
 
-  protected abstract List<SegmentDataManager> getSegmentDataManagers();
+  protected abstract List<IndexSegment> getIndexSegments();
 
   /**
-   * Run query on single index segment.
+   * Run PQL query on single index segment.
    * <p>Use this to test a single operator.
-   *
-   * @param query PQL query.
-   * @return query operator.
    */
-  @SuppressWarnings("unchecked")
-  protected <T extends Operator> T getOperatorForQuery(String query) {
-    return (T) PLAN_MAKER.makeInnerSegmentPlan(getIndexSegment(), 
PQL_COMPILER.compileToBrokerRequest(query)).run();
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  protected <T extends Operator> T getOperatorForQuery(String pqlQuery) {
+    BrokerRequest brokerRequest = 
PQL_COMPILER.compileToBrokerRequest(pqlQuery);
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    return (T) PLAN_MAKER.makeSegmentPlanNode(getIndexSegment(), 
queryContext).run();
   }
 
   /**
-   * Run query with hard-coded filter on single index segment.
+   * Run PQL query with hard-coded filter on single index segment.
    * <p>Use this to test a single operator.
-   *
-   * @param query PQL query without any filter.
-   * @return query operator.
    */
-  protected <T extends Operator> T getOperatorForQueryWithFilter(String query) 
{
-    return getOperatorForQuery(query + getFilter());
+  @SuppressWarnings("rawtypes")
+  protected <T extends Operator> T getOperatorForQueryWithFilter(String 
pqlQuery) {
+    return getOperatorForQuery(pqlQuery + getFilter());
   }
 
   /**
-   * Run query on multiple index segments with custom plan maker.
-   * <p>Use this to test the whole flow from server to broker.
-   * <p>The result should be equivalent to querying 4 identical index segments.
-   *
-   * @param query PQL query.
-   * @param planMaker Plan maker.
-   * @return broker response.
-   */
-  protected BrokerResponseNative getBrokerResponseForPqlQuery(String query, 
PlanMaker planMaker) {
-    return getBrokerResponseForPqlQuery(query, planMaker, null);
-  }
-
-  /**
-   * Run query on multiple index segments with custom plan maker and 
queryOptions.
+   * Run PQL query on multiple index segments.
    * <p>Use this to test the whole flow from server to broker.
    * <p>The result should be equivalent to querying 4 identical index segments.
-   *
-   * @param query PQL query.
-   * @param planMaker Plan maker.
-   * @return broker response.
    */
-  private BrokerResponseNative getBrokerResponseForPqlQuery(String query, 
PlanMaker planMaker,
-      Map<String, String> queryOptions) {
-    BrokerRequest brokerRequest = PQL_COMPILER.compileToBrokerRequest(query);
-    return getBrokerResponseForBrokerRequest(brokerRequest, planMaker, 
queryOptions);
+  protected BrokerResponseNative getBrokerResponseForPqlQuery(String pqlQuery) 
{
+    return getBrokerResponseForPqlQuery(pqlQuery, PLAN_MAKER);
   }
 
   /**
-   * Run query on multiple index segments.
+   * Run PQL query with hard-coded filter on multiple index segments.
    * <p>Use this to test the whole flow from server to broker.
    * <p>The result should be equivalent to querying 4 identical index segments.
-   *
-   * @param query SQL query.
-   * @return broker response.
    */
-  protected BrokerResponseNative getBrokerResponseForSqlQuery(String query) {
-    return getBrokerResponseForSqlQuery(query, PLAN_MAKER);
+  protected BrokerResponseNative getBrokerResponseForPqlQueryWithFilter(String 
pqlQuery) {
+    return getBrokerResponseForPqlQuery(pqlQuery + getFilter());
   }
 
   /**
-   * Run query on multiple index segments with custom plan maker.
+   * Run PQL query on multiple index segments with custom plan maker.
    * <p>Use this to test the whole flow from server to broker.
    * <p>The result should be equivalent to querying 4 identical index segments.
-   *
-   * @param query SQL query.
-   * @param planMaker Plan maker.
-   * @return broker response.
    */
-  protected BrokerResponseNative getBrokerResponseForSqlQuery(String query, 
PlanMaker planMaker) {
-    HashMap<String, String> queryOptions = new HashMap<>();
-    queryOptions.put("groupByMode", "sql");
-    queryOptions.put("responseFormat", "sql");
-    return getBrokerResponseForSqlQuery(query, planMaker, queryOptions);
+  protected BrokerResponseNative getBrokerResponseForPqlQuery(String pqlQuery, 
PlanMaker planMaker) {
+    return getBrokerResponseForPqlQuery(pqlQuery, planMaker, null);
   }
 
   /**
-   * Run query on multiple index segments with custom plan maker and 
queryOptions.
+   * Run PQL query on multiple index segments.
    * <p>Use this to test the whole flow from server to broker.
    * <p>The result should be equivalent to querying 4 identical index segments.
-   *
-   * @param query SQL query.
-   * @param planMaker Plan maker.
-   * @return broker response.
    */
-  private BrokerResponseNative getBrokerResponseForSqlQuery(String query, 
PlanMaker planMaker,
-      Map<String, String> queryOptions) {
-    BrokerRequest brokerRequest = SQL_COMPILER.compileToBrokerRequest(query);
-    return getBrokerResponseForBrokerRequest(brokerRequest, planMaker, 
queryOptions);
+  protected BrokerResponseNative getBrokerResponseForPqlQuery(String pqlQuery,
+      @Nullable Map<String, String> extraQueryOptions) {
+    return getBrokerResponseForPqlQuery(pqlQuery, PLAN_MAKER, 
extraQueryOptions);
   }
 
   /**
-   * Run query on multiple index segments with custom plan maker and 
queryOptions.
+   * Run PQL query on multiple index segments with custom plan maker and 
queryOptions.
    * <p>Use this to test the whole flow from server to broker.
    * <p>The result should be equivalent to querying 4 identical index segments.
-   *
-   * @param brokerRequest broker request.
-   * @param planMaker Plan maker.
-   * @return broker response.
    */
-  private BrokerResponseNative getBrokerResponseForBrokerRequest(BrokerRequest 
brokerRequest, PlanMaker planMaker,
-      Map<String, String> queryOptions) {
-    Map<String, String> allQueryOptions = new HashMap<>();
-
-    if (queryOptions != null) {
-      allQueryOptions.putAll(queryOptions);
-    }
-    if (brokerRequest.getQueryOptions() != null) {
-      allQueryOptions.putAll(brokerRequest.getQueryOptions());
+  private BrokerResponseNative getBrokerResponseForPqlQuery(String pqlQuery, 
PlanMaker planMaker,
+      @Nullable Map<String, String> extraQueryOptions) {
+    BrokerRequest brokerRequest = 
PQL_COMPILER.compileToBrokerRequest(pqlQuery);
+    if (extraQueryOptions != null) {
+      Map<String, String> queryOptions = brokerRequest.getQueryOptions();
+      if (queryOptions != null) {
+        queryOptions.putAll(extraQueryOptions);
+      } else {
+        brokerRequest.setQueryOptions(extraQueryOptions);
+      }
     }
-    if (!allQueryOptions.isEmpty()) {
-      brokerRequest.setQueryOptions(allQueryOptions);
-    }
-
-    // Server side.
-    Plan plan = planMaker.makeInterSegmentPlan(getSegmentDataManagers(), 
brokerRequest, EXECUTOR_SERVICE,
-        Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
-    DataTable instanceResponse = plan.execute();
-
-    // Broker side.
-    BrokerReduceService brokerReduceService = new BrokerReduceService();
-    Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
-    dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.OFFLINE), instanceResponse);
-    dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.REALTIME), instanceResponse);
-    return brokerReduceService.reduceOnDataTable(brokerRequest, dataTableMap, 
null);
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    return getBrokerResponse(queryContext, planMaker);
   }
 
   /**
-   * Run query on multiple index segments.
+   * Run SQL query on multiple index segments.
    * <p>Use this to test the whole flow from server to broker.
    * <p>The result should be equivalent to querying 4 identical index segments.
-   *
-   * @param query PQL query.
-   * @return broker response.
    */
-  protected BrokerResponseNative getBrokerResponseForPqlQuery(String query) {
-    return getBrokerResponseForPqlQuery(query, PLAN_MAKER);
+  protected BrokerResponseNative getBrokerResponseForSqlQuery(String sqlQuery) 
{
+    return getBrokerResponseForSqlQuery(sqlQuery, PLAN_MAKER);
   }
 
   /**
-   * Run query on multiple index segments.
+   * Run SQL query with hard-coded filter on multiple index segments.
    * <p>Use this to test the whole flow from server to broker.
    * <p>The result should be equivalent to querying 4 identical index segments.
-   *
-   * @param query PQL query.
-   * @return broker response.
    */
-  protected BrokerResponseNative getBrokerResponseForPqlQuery(String query, 
Map<String, String> queryOptions) {
-    return getBrokerResponseForPqlQuery(query, PLAN_MAKER, queryOptions);
+  protected BrokerResponseNative getBrokerResponseForSqlQueryWithFilter(String 
sqlQuery) {
+    return getBrokerResponseForSqlQuery(sqlQuery + getFilter());
   }
 
   /**
-   * Run query with hard-coded filter on multiple index segments.
+   * Run SQL query on multiple index segments with custom plan maker.
    * <p>Use this to test the whole flow from server to broker.
    * <p>The result should be equivalent to querying 4 identical index segments.
-   *
-   * @param query PQL query without any filter.
-   * @return broker response.
    */
-  protected BrokerResponseNative getBrokerResponseForPqlQueryWithFilter(String 
query) {
-    return getBrokerResponseForPqlQuery(query + getFilter());
+  @SuppressWarnings("SameParameterValue")
+  protected BrokerResponseNative getBrokerResponseForSqlQuery(String sqlQuery, 
PlanMaker planMaker) {
+    BrokerRequest brokerRequest = 
SQL_COMPILER.compileToBrokerRequest(sqlQuery);
+    Map<String, String> queryOptions = brokerRequest.getQueryOptions();
+    if (queryOptions == null) {
+      queryOptions = new HashMap<>();
+      brokerRequest.setQueryOptions(queryOptions);
+    }
+    queryOptions.put(Request.QueryOptionKey.GROUP_BY_MODE, Request.SQL);
+    queryOptions.put(Request.QueryOptionKey.RESPONSE_FORMAT, Request.SQL);
+    QueryContext queryContext = 
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+    return getBrokerResponse(queryContext, planMaker);
   }
 
   /**
-   * Run query with hard-coded filter on multiple index segments.
+   * Run query on multiple index segments with custom plan maker.
    * <p>Use this to test the whole flow from server to broker.
    * <p>The result should be equivalent to querying 4 identical index segments.
-   *
-   * @param query SQL query without any filter.
-   * @return broker response.
    */
-  protected BrokerResponseNative getBrokerResponseForSqlQueryWithFilter(String 
query) {
-    return getBrokerResponseForSqlQuery(query + getFilter());
+  private BrokerResponseNative getBrokerResponse(QueryContext queryContext, 
PlanMaker planMaker) {
+    // Server side.
+    Plan plan = planMaker
+        .makeInstancePlan(getIndexSegments(), queryContext, EXECUTOR_SERVICE, 
Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
+    DataTable instanceResponse = plan.execute();
+
+    // Broker side.
+    BrokerReduceService brokerReduceService = new BrokerReduceService();
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+    dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.OFFLINE), instanceResponse);
+    dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.REALTIME), instanceResponse);
+    return 
brokerReduceService.reduceOnDataTable(queryContext.getBrokerRequest(), 
dataTableMap, null);
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
index e46e87f..4ec7fbc 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
@@ -26,8 +26,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -70,7 +68,7 @@ public abstract class BaseSingleValueQueriesTest extends 
BaseQueriesTest {
   private static final String AVRO_DATA = "data" + File.separator + 
"test_data-sv.avro";
   private static final String SEGMENT_NAME = "testTable_126164076_167572854";
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"SingleValueQueriesTest");
-  private static final int NUM_SEGMENT_DATA_MANAGERS = 2;
+  private static final int NUM_SEGMENTS = 2;
 
   // Hard-coded query filter.
   private static final String QUERY_FILTER =
@@ -79,7 +77,7 @@ public abstract class BaseSingleValueQueriesTest extends 
BaseQueriesTest {
 
   private IndexSegment _indexSegment;
   // Contains 2 identical index segments.
-  private List<SegmentDataManager> _segmentDataManagers;
+  private List<IndexSegment> _indexSegments;
 
   @BeforeTest
   public void buildSegment()
@@ -100,8 +98,7 @@ public abstract class BaseSingleValueQueriesTest extends 
BaseQueriesTest {
         .addSingleValueDimension("column11", FieldSpec.DataType.STRING)
         .addSingleValueDimension("column12", 
FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
         .addMetric("column18", FieldSpec.DataType.INT)
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, 
TimeUnit.DAYS, "daysSinceEpoch"), null)
-        .build();
+        .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, 
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch").build();
 
@@ -130,15 +127,15 @@ public abstract class BaseSingleValueQueriesTest extends 
BaseQueriesTest {
       throws Exception {
     ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
     _indexSegment = immutableSegment;
-    int numSegmentDataManagers = getNumSegmentDataManagers();
-    _segmentDataManagers = new ArrayList<>(numSegmentDataManagers);
-    for (int i = 0; i < numSegmentDataManagers; i++) {
-      _segmentDataManagers.add(new 
ImmutableSegmentDataManager(immutableSegment));
+    int numSegments = getNumSegments();
+    _indexSegments = new ArrayList<>(numSegments);
+    for (int i = 0; i < numSegments; i++) {
+      _indexSegments.add(immutableSegment);
     }
   }
 
-  protected int getNumSegmentDataManagers() {
-    return NUM_SEGMENT_DATA_MANAGERS;
+  protected int getNumSegments() {
+    return NUM_SEGMENTS;
   }
 
   @AfterClass
@@ -162,7 +159,7 @@ public abstract class BaseSingleValueQueriesTest extends 
BaseQueriesTest {
   }
 
   @Override
-  protected List<SegmentDataManager> getSegmentDataManagers() {
-    return _segmentDataManagers;
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
index 6b805df..5ca02e1 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import joptsimple.internal.Strings;
 import org.apache.commons.io.FileUtils;
@@ -37,8 +36,6 @@ import 
org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.GroupByResult;
 import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
@@ -68,21 +65,20 @@ import org.testng.annotations.Test;
  * </ul>
  */
 public class DistinctCountThetaSketchTest extends BaseQueriesTest {
-  protected static final File INDEX_DIR = new 
File(FileUtils.getTempDirectory(), "DistinctCountThetaSketchTest");
-  protected static final String TABLE_NAME = "testTable";
-  protected static final String SEGMENT_NAME = "testSegment";
-
-  protected static final int NUM_ROWS = 1001;
-  protected static final long RANDOM_SEED = System.nanoTime();
-
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"DistinctCountThetaSketchTest");
+  private static final String TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
   private static final String THETA_SKETCH_COLUMN = "colTS";
   private static final String DISTINCT_COLUMN = "distinctColumn";
 
-  private static Random RANDOM = new Random(RANDOM_SEED);
-  protected static final int MAX_CARDINALITY = 5; // 3 columns will lead to at 
most 125 groups
+  private static final int NUM_ROWS = 1001;
+  private static final int MAX_CARDINALITY = 5; // 3 columns will lead to at 
most 125 groups
+
+  private static final long RANDOM_SEED = System.nanoTime();
+  private static final Random RANDOM = new Random(RANDOM_SEED);
 
-  private ImmutableSegment _indexSegment;
-  private List<SegmentDataManager> _segmentDataManagers;
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
 
   @BeforeClass
   public void setup()
@@ -90,9 +86,9 @@ public class DistinctCountThetaSketchTest extends 
BaseQueriesTest {
     FileUtils.deleteQuietly(INDEX_DIR);
 
     File segmentFile = buildSegment(buildSchema());
-    _indexSegment = ImmutableSegmentLoader.load(segmentFile, ReadMode.mmap);
-    _segmentDataManagers =
-        Arrays.asList(new ImmutableSegmentDataManager(_indexSegment), new 
ImmutableSegmentDataManager(_indexSegment));
+    ImmutableSegment immutableSegment = 
ImmutableSegmentLoader.load(segmentFile, ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
   }
 
   @AfterClass
@@ -179,12 +175,11 @@ public class DistinctCountThetaSketchTest extends 
BaseQueriesTest {
   }
 
   private void testQuery(String tsQuery, String distinctQuery, boolean 
groupBy, boolean sql, boolean raw) {
-    Map<String, String> queryOptions = Collections.emptyMap();
     BrokerResponseNative actualResponse =
-        (sql) ? getBrokerResponseForSqlQuery(tsQuery) : 
getBrokerResponseForPqlQuery(tsQuery, queryOptions);
+        (sql) ? getBrokerResponseForSqlQuery(tsQuery) : 
getBrokerResponseForPqlQuery(tsQuery);
 
     BrokerResponseNative expectedResponse =
-        (sql) ? getBrokerResponseForSqlQuery(distinctQuery) : 
getBrokerResponseForPqlQuery(distinctQuery, queryOptions);
+        (sql) ? getBrokerResponseForSqlQuery(distinctQuery) : 
getBrokerResponseForPqlQuery(distinctQuery);
 
     if (groupBy) {
       compareGroupBy(actualResponse, expectedResponse, sql, raw);
@@ -328,8 +323,8 @@ public class DistinctCountThetaSketchTest extends 
BaseQueriesTest {
   }
 
   @Override
-  protected List<SegmentDataManager> getSegmentDataManagers() {
-    return _segmentDataManagers;
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
   }
 
   protected File buildSegment(Schema schema)
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index 85b39ba..a44d0b7 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -40,8 +40,6 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.indexsegment.IndexSegment;
@@ -51,6 +49,8 @@ import 
org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.operator.query.AggregationOperator;
 import 
org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -95,7 +95,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
 
   private IndexSegment _indexSegment;
-  private List<SegmentDataManager> _segmentDataManagers;
+  private List<IndexSegment> _indexSegments;
 
   @Override
   protected String getFilter() {
@@ -108,8 +108,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
   }
 
   @Override
-  protected List<SegmentDataManager> getSegmentDataManagers() {
-    return _segmentDataManagers;
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
   }
 
   @BeforeClass
@@ -351,8 +351,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       throws Exception {
     ImmutableSegment segment0 = createSegment(0, generateRecords(0));
     ImmutableSegment segment1 = createSegment(1, generateRecords(1000));
-    _segmentDataManagers =
-        Arrays.asList(new ImmutableSegmentDataManager(segment0), new 
ImmutableSegmentDataManager(segment1));
+    _indexSegments = Arrays.asList(segment0, segment1);
     try {
       {
         // Test selecting all columns
@@ -605,10 +604,12 @@ public class DistinctQueriesTest extends BaseQueriesTest {
             "SELECT DISTINCT longColumn FROM testTable WHERE doubleColumn < 
200 ORDER BY longColumn DESC LIMIT 5";
 
         BrokerRequest pqlBrokerRequest = 
PQL_COMPILER.compileToBrokerRequest(pqlQuery);
-        BrokerResponseNative pqlResponse = 
queryServersWithDifferentSegments(pqlBrokerRequest, segment0, segment1);
+        QueryContext pqlQueryContext = 
BrokerRequestToQueryContextConverter.convert(pqlBrokerRequest);
+        BrokerResponseNative pqlResponse = 
queryServersWithDifferentSegments(pqlQueryContext, segment0, segment1);
         BrokerRequest sqlBrokerRequest = 
SQL_COMPILER.compileToBrokerRequest(sqlQuery);
         
sqlBrokerRequest.setQueryOptions(Collections.singletonMap("responseFormat", 
"sql"));
-        BrokerResponseNative sqlResponse = 
queryServersWithDifferentSegments(sqlBrokerRequest, segment0, segment1);
+        QueryContext sqlQueryContext = 
BrokerRequestToQueryContextConverter.convert(sqlBrokerRequest);
+        BrokerResponseNative sqlResponse = 
queryServersWithDifferentSegments(sqlQueryContext, segment0, segment1);
 
         // Check data schema
         SelectionResults selectionResults = pqlResponse.getSelectionResults();
@@ -636,8 +637,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
         }
       }
     } finally {
-      for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
-        segmentDataManager.destroy();
+      for (IndexSegment indexSegment : _indexSegments) {
+        indexSegment.destroy();
       }
     }
   }
@@ -646,24 +647,21 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    * Helper method to query 2 servers with different segments. Server0 will 
have 2 copies of segment0; Server1 will have
    * 2 copies of segment1.
    */
-  private BrokerResponseNative queryServersWithDifferentSegments(BrokerRequest 
brokerRequest, ImmutableSegment segment0,
+  private BrokerResponseNative queryServersWithDifferentSegments(QueryContext 
queryContext, ImmutableSegment segment0,
       ImmutableSegment segment1) {
-    List<SegmentDataManager> segmentDataManagers0 =
-        Arrays.asList(new ImmutableSegmentDataManager(segment0), new 
ImmutableSegmentDataManager(segment0));
-    List<SegmentDataManager> segmentDataManagers1 =
-        Arrays.asList(new ImmutableSegmentDataManager(segment1), new 
ImmutableSegmentDataManager(segment1));
-
     // Server side
-    DataTable instanceResponse0 = 
PLAN_MAKER.makeInterSegmentPlan(segmentDataManagers0, brokerRequest, 
EXECUTOR_SERVICE,
-        CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
-    DataTable instanceResponse1 = 
PLAN_MAKER.makeInterSegmentPlan(segmentDataManagers1, brokerRequest, 
EXECUTOR_SERVICE,
-        CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+    DataTable instanceResponse0 = PLAN_MAKER
+        .makeInstancePlan(Arrays.asList(segment0, segment0), queryContext, 
EXECUTOR_SERVICE,
+            
CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+    DataTable instanceResponse1 = PLAN_MAKER
+        .makeInstancePlan(Arrays.asList(segment1, segment1), queryContext, 
EXECUTOR_SERVICE,
+            
CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
 
     // Broker side
     BrokerReduceService brokerReduceService = new BrokerReduceService();
     Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
     dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.OFFLINE), instanceResponse0);
     dataTableMap.put(new ServerRoutingInstance("localhost", 1234, 
TableType.REALTIME), instanceResponse1);
-    return brokerReduceService.reduceOnDataTable(brokerRequest, dataTableMap, 
null);
+    return 
brokerReduceService.reduceOnDataTable(queryContext.getBrokerRequest(), 
dataTableMap, null);
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
index 41f7b9e..386ae27 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
@@ -27,8 +27,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -85,7 +83,7 @@ public class FastHllQueriesTest extends BaseQueriesTest {
 
   private IndexSegment _indexSegment;
   // Contains 2 identical index segments
-  private List<SegmentDataManager> _segmentDataManagers;
+  private List<IndexSegment> _indexSegments;
 
   @Override
   protected String getFilter() {
@@ -98,8 +96,8 @@ public class FastHllQueriesTest extends BaseQueriesTest {
   }
 
   @Override
-  protected List<SegmentDataManager> getSegmentDataManagers() {
-    return _segmentDataManagers;
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
   }
 
   @Test
@@ -196,8 +194,7 @@ public class FastHllQueriesTest extends BaseQueriesTest {
 
     ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
     _indexSegment = immutableSegment;
-    _segmentDataManagers = Arrays
-        .asList(new ImmutableSegmentDataManager(immutableSegment), new 
ImmutableSegmentDataManager(immutableSegment));
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
   }
 
   private void deleteSegment() {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
index dcae9e7..d1e7023 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
@@ -31,21 +31,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.MetricFieldSpec;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.GroupByResult;
 import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
-import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -57,6 +47,14 @@ import 
org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggrega
 import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -92,8 +90,8 @@ public class PercentileTDigestQueriesTest extends 
BaseQueriesTest {
   protected static final Random RANDOM = new Random(RANDOM_SEED);
   protected static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
 
-  private ImmutableSegment _indexSegment;
-  private List<SegmentDataManager> _segmentDataManagers;
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
 
   @Override
   protected String getFilter() {
@@ -106,8 +104,8 @@ public class PercentileTDigestQueriesTest extends 
BaseQueriesTest {
   }
 
   @Override
-  protected List<SegmentDataManager> getSegmentDataManagers() {
-    return _segmentDataManagers;
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
   }
 
   @BeforeClass
@@ -116,9 +114,9 @@ public class PercentileTDigestQueriesTest extends 
BaseQueriesTest {
     FileUtils.deleteQuietly(INDEX_DIR);
 
     buildSegment();
-    _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), ReadMode.mmap);
-    _segmentDataManagers =
-        Arrays.asList(new ImmutableSegmentDataManager(_indexSegment), new 
ImmutableSegmentDataManager(_indexSegment));
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
   }
 
   protected void buildSegment()
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
index 2f34a01..99f3a90 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.queries;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Random;
@@ -27,7 +28,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.common.utils.Pairs;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
@@ -43,66 +43,37 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 
-public class RangePredicateWithSortedInvertedIndexTest extends BaseQueriesTest 
{
-  private static final int NUM_ROWS = 30000;
 
-  private List<GenericRow> _rows = new ArrayList<>();
+public class RangePredicateWithSortedInvertedIndexTest extends BaseQueriesTest 
{
+  private static final File INDEX_DIR =
+      new File(FileUtils.getTempDirectory(), 
"RangePredicateWithSortedInvertedIndexTest");
+  private static final String TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
 
   private static final String D1 = "STRING_COL";
   private static final String M1 = "INT_COL"; // sorted column
   private static final String M2 = "LONG_COL";
 
+  private static final int NUM_ROWS = 30000;
   private static final int INT_BASE_VALUE = 0;
 
-  private static final String TABLE_NAME = "TestTable";
-  private static final int NUM_SEGMENTS = 1;
-  private static final String SEGMENT_NAME_1 = TABLE_NAME + 
"_100000000_200000000";
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"SortedRangeTest");
-
-  private List<IndexSegment> _indexSegments = new ArrayList<>(NUM_SEGMENTS);
-  private final String[] stringValues = new String[NUM_ROWS];
-  private final long[] longValues = new long[NUM_ROWS];
-
-  private Schema _schema;
-  private TableConfig _tableConfig;
+  private static final long RANDOM_SEED = System.nanoTime();
+  private static final Random RANDOM = new Random(RANDOM_SEED);
+  private static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
 
-  @BeforeClass
-  public void setUp() {
-    createPinotTableSchema();
-    createTestData();
-  }
+  private final String[] _stringValues = new String[NUM_ROWS];
+  private final long[] _longValues = new long[NUM_ROWS];
 
-  @AfterClass
-  public void tearDown() {
-    FileUtils.deleteQuietly(INDEX_DIR);
-  }
-
-  private void createPinotTableSchema() {
-    _schema =
-        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
-            .addMetric(M1, FieldSpec.DataType.INT).addMetric(M2, 
FieldSpec.DataType.LONG).build();
-    _tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
-  }
-
-  private void createTestData() {
-    Random random = new Random();
-    for (int rowIndex = 0; rowIndex < NUM_ROWS; rowIndex++) {
-      GenericRow row = new GenericRow();
-      stringValues[rowIndex] = RandomStringUtils.randomAlphanumeric(10);
-      row.putValue(D1, stringValues[rowIndex]);
-      row.putValue(M1, INT_BASE_VALUE + rowIndex);
-      longValues[rowIndex] = random.nextLong();
-      row.putValue(M2, longValues[rowIndex]);
-      _rows.add(row);
-    }
-  }
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
 
   @Override
   protected String getFilter() {
@@ -111,121 +82,134 @@ public class RangePredicateWithSortedInvertedIndexTest 
extends BaseQueriesTest {
 
   @Override
   protected IndexSegment getIndexSegment() {
-    return _indexSegments.get(0);
+    return _indexSegment;
   }
 
   @Override
-  protected List<SegmentDataManager> getSegmentDataManagers() {
-    return null;
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
   }
 
-  private void createSegment(TableConfig tableConfig, Schema schema, 
RecordReader recordReader, String segmentName,
-      String tableName)
+  @BeforeClass
+  public void setUp()
       throws Exception {
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
-    segmentGeneratorConfig.setTableName(tableName);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
-    segmentGeneratorConfig.setSegmentName(segmentName);
+    FileUtils.deleteQuietly(INDEX_DIR);
 
-    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig, recordReader);
-    driver.build();
+    buildSegment();
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
 
-    File segmentIndexDir = new File(INDEX_DIR.getAbsolutePath(), segmentName);
-    if (!segmentIndexDir.exists()) {
-      throw new IllegalStateException("Segment generation failed");
-    }
+  @AfterClass
+  public void tearDown() {
+    _indexSegment.destroy();
+    FileUtils.deleteQuietly(INDEX_DIR);
   }
 
-  private ImmutableSegment loadSegment(String segmentName)
+  private void buildSegment()
       throws Exception {
-    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), 
ReadMode.heap);
+    List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+    for (int rowIndex = 0; rowIndex < NUM_ROWS; rowIndex++) {
+      GenericRow row = new GenericRow();
+      _stringValues[rowIndex] = RandomStringUtils.randomAlphanumeric(10);
+      row.putValue(D1, _stringValues[rowIndex]);
+      row.putValue(M1, INT_BASE_VALUE + rowIndex);
+      _longValues[rowIndex] = RANDOM.nextLong();
+      row.putValue(M2, _longValues[rowIndex]);
+      rows.add(row);
+    }
+
+    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
+            .addMetric(M1, FieldSpec.DataType.INT).addMetric(M2, 
FieldSpec.DataType.LONG).build();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(SEGMENT_NAME);
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
+    }
   }
 
   @Test
-  public void testInnerSegmentQuery()
-      throws Exception {
-    Random random = new Random();
-    try (RecordReader recordReader = new GenericRowRecordReader(_rows)) {
-      createSegment(_tableConfig, _schema, recordReader, SEGMENT_NAME_1, 
TABLE_NAME);
-      final ImmutableSegment immutableSegment = loadSegment(SEGMENT_NAME_1);
-      _indexSegments.add(immutableSegment);
-
-      String query = "SELECT STRING_COL, INT_COL FROM TestTable WHERE INT_COL 
>= 20000 LIMIT 100000";
-      Pairs.IntPair pair = new Pairs.IntPair(20000, 29999);
-      runQuery(query, 10000, Lists.newArrayList(pair), 2);
-
-      query = "SELECT STRING_COL, INT_COL FROM TestTable WHERE INT_COL >= 
20000 AND INT_COL <= 23666 LIMIT 100000";
-      pair = new Pairs.IntPair(20000, 23666);
-      runQuery(query, 3667, Lists.newArrayList(pair), 2);
-
-      query = "SELECT STRING_COL, INT_COL FROM TestTable WHERE INT_COL <= 
20000 LIMIT 100000";
-      pair = new Pairs.IntPair(0, 20000);
-      runQuery(query, 20001, Lists.newArrayList(pair), 2);
-
-      String filter = "WHERE (INT_COL >= 15000 AND INT_COL <= 16665) OR 
(INT_COL >= 18000 AND INT_COL <= 19887)";
-      query = "SELECT STRING_COL, INT_COL FROM TestTable " + filter + " LIMIT 
100000";
-      pair = new Pairs.IntPair(15000, 16665);
-      Pairs.IntPair pair1 = new Pairs.IntPair(18000, 19987);
-      runQuery(query, 3554, Lists.newArrayList(pair, pair1), 2);
-
-      // range predicate on sorted column which will use sorted inverted index 
based iterator
-      // along with range predicate on unsorted column that uses scan based 
iterator
-      int index = random.nextInt(NUM_ROWS + 1);
-      long longPredicateValue = longValues[index];
-      int count = 0;
-      List<Pairs.IntPair> pairs = new ArrayList<>();
-      Pairs.IntPair current = null;
-      for (int i = 0 ; i < longValues.length; i++) {
-        if (longValues[i] >= longPredicateValue && i >= 15000 && i <= 16665) {
-          if (current == null) {
-            current = new Pairs.IntPair(i, i);
+  public void testInnerSegmentQuery() {
+    String query = "SELECT STRING_COL, INT_COL FROM testTable WHERE INT_COL >= 
20000 LIMIT 100000";
+    Pairs.IntPair pair = new Pairs.IntPair(20000, 29999);
+    runQuery(query, 10000, Lists.newArrayList(pair), 2);
+
+    query = "SELECT STRING_COL, INT_COL FROM testTable WHERE INT_COL >= 20000 
AND INT_COL <= 23666 LIMIT 100000";
+    pair = new Pairs.IntPair(20000, 23666);
+    runQuery(query, 3667, Lists.newArrayList(pair), 2);
+
+    query = "SELECT STRING_COL, INT_COL FROM testTable WHERE INT_COL <= 20000 
LIMIT 100000";
+    pair = new Pairs.IntPair(0, 20000);
+    runQuery(query, 20001, Lists.newArrayList(pair), 2);
+
+    String filter = "WHERE (INT_COL >= 15000 AND INT_COL <= 16665) OR (INT_COL 
>= 18000 AND INT_COL <= 19887)";
+    query = "SELECT STRING_COL, INT_COL FROM testTable " + filter + " LIMIT 
100000";
+    pair = new Pairs.IntPair(15000, 16665);
+    Pairs.IntPair pair1 = new Pairs.IntPair(18000, 19987);
+    runQuery(query, 3554, Lists.newArrayList(pair, pair1), 2);
+
+    // range predicate on sorted column which will use sorted inverted index 
based iterator
+    // along with range predicate on unsorted column that uses scan based 
iterator
+    int index = RANDOM.nextInt(NUM_ROWS);
+    long longPredicateValue = _longValues[index];
+    int count = 0;
+    List<Pairs.IntPair> pairs = new ArrayList<>();
+    Pairs.IntPair current = null;
+    for (int i = 0; i < _longValues.length; i++) {
+      if (_longValues[i] >= longPredicateValue && i >= 15000 && i <= 16665) {
+        if (current == null) {
+          current = new Pairs.IntPair(i, i);
+        } else {
+          if (i == current.getRight() + 1) {
+            current.setRight(i);
           } else {
-            if (i == current.getRight() + 1) {
-              current.setRight(i);
-            } else {
-              if (i <= longValues.length - 2) {
-                pairs.add(current);
-                current = new Pairs.IntPair(i, i);
-              }
+            if (i <= _longValues.length - 2) {
+              pairs.add(current);
+              current = new Pairs.IntPair(i, i);
             }
           }
-          count++;
         }
+        count++;
       }
-      pairs.add(current);
-      filter = "WHERE INT_COL >= 15000 AND INT_COL <= 16665 AND LONG_COL >= " 
+ longPredicateValue;
-      query = "SELECT STRING_COL, INT_COL, LONG_COL FROM TestTable " + filter 
+ " LIMIT 100000";
-      runQuery(query, count, pairs, 3);
-
-      // empty resultset
-      query = "SELECT STRING_COL, INT_COL FROM TestTable WHERe INT_COL < 0 
LIMIT 100000";
-      runQuery(query, 0, null, 0);
-
-      // empty resultset
-      query = "SELECT STRING_COL, INT_COL FROM TestTable WHERE INT_COL > 30000 
LIMIT 100000";
-      runQuery(query, 0, null, 0);
-    } finally {
-      destroySegments();
     }
+    pairs.add(current);
+    filter = "WHERE INT_COL >= 15000 AND INT_COL <= 16665 AND LONG_COL >= " + 
longPredicateValue;
+    query = "SELECT STRING_COL, INT_COL, LONG_COL FROM testTable " + filter + 
" LIMIT 100000";
+    runQuery(query, count, pairs, 3);
+
+    // empty resultset
+    query = "SELECT STRING_COL, INT_COL FROM testTable WHERe INT_COL < 0 LIMIT 
100000";
+    runQuery(query, 0, null, 0);
+
+    // empty resultset
+    query = "SELECT STRING_COL, INT_COL FROM testTable WHERE INT_COL > 30000 
LIMIT 100000";
+    runQuery(query, 0, null, 0);
   }
 
   private void runQuery(String query, int count, List<Pairs.IntPair> intPairs, 
int numColumns) {
     SelectionOnlyOperator operator = getOperatorForQuery(query);
     IntermediateResultsBlock block = operator.nextBlock();
     Collection<Object[]> rows = block.getSelectionResult();
-    Assert.assertNotNull(rows);
-    Assert.assertEquals(rows.size(), count);
+    assertNotNull(rows, ERROR_MESSAGE);
+    assertEquals(rows.size(), count, ERROR_MESSAGE);
     if (count > 0) {
       Pairs.IntPair pair = intPairs.get(0);
       int startPos = pair.getLeft();
       int pairPos = 0;
       for (Object[] row : rows) {
-        Assert.assertEquals(numColumns, row.length);
-        Assert.assertEquals(row[0], stringValues[startPos]);
-        Assert.assertEquals(row[1], startPos);
+        assertEquals(numColumns, row.length, ERROR_MESSAGE);
+        assertEquals(row[0], _stringValues[startPos], ERROR_MESSAGE);
+        assertEquals(row[1], startPos, ERROR_MESSAGE);
         if (numColumns == 3) {
-          Assert.assertEquals(row[2], longValues[startPos]);
+          assertEquals(row[2], _longValues[startPos], ERROR_MESSAGE);
         }
         startPos++;
         if (startPos > pair.getRight() && pairPos <= intPairs.size() - 2) {
@@ -236,13 +220,4 @@ public class RangePredicateWithSortedInvertedIndexTest 
extends BaseQueriesTest {
       }
     }
   }
-
-  private void destroySegments() {
-    for (IndexSegment indexSegment : _indexSegments) {
-      if (indexSegment != null) {
-        indexSegment.destroy();
-      }
-    }
-    _indexSegments.clear();
-  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
index d6b89cf..b7df980 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
@@ -40,7 +40,7 @@ public class SelectionOnlyEarlyTerminationTest extends 
BaseSingleValueQueriesTes
    * (2 * MAX_NUM_THREADS_PER_QUERY) segments per server.
    */
   @Override
-  protected int getNumSegmentDataManagers() {
+  protected int getNumSegments() {
     return CombineOperator.MAX_NUM_THREADS_PER_QUERY * 2;
   }
 
@@ -52,7 +52,7 @@ public class SelectionOnlyEarlyTerminationTest extends 
BaseSingleValueQueriesTes
   @Test
   public void testSelectOnlyQuery() {
     int numThreadsPerServer = CombineOperator.MAX_NUM_THREADS_PER_QUERY;
-    int numSegmentsPerServer = getNumSegmentDataManagers();
+    int numSegmentsPerServer = getNumSegments();
 
     // LIMIT = 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480
     for (int limit = 5; limit < NUM_DOCS_PER_SEGMENT; limit *= 2) {
@@ -95,7 +95,7 @@ public class SelectionOnlyEarlyTerminationTest extends 
BaseSingleValueQueriesTes
    */
   @Test
   public void testSelectWithOrderByQuery() {
-    int numSegmentsPerServer = getNumSegmentDataManagers();
+    int numSegmentsPerServer = getNumSegments();
     String query = "SELECT column11, column18, column1 FROM testTable ORDER BY 
column11";
     int numColumnsInSelection = 3;
     BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
index 29300ad..b1572b2 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
@@ -29,20 +29,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.GroupByResult;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
-import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -56,6 +48,12 @@ import 
org.apache.pinot.core.query.aggregation.function.customobject.QuantileDig
 import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -111,8 +109,8 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
   private final QuantileDigest[] _quantileDigests = new 
QuantileDigest[NUM_ROWS];
   private final TDigest[] _tDigests = new TDigest[NUM_ROWS];
 
-  private ImmutableSegment _indexSegment;
-  private List<SegmentDataManager> _segmentDataManagers;
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
 
   @Override
   protected String getFilter() {
@@ -125,8 +123,8 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
   }
 
   @Override
-  protected List<SegmentDataManager> getSegmentDataManagers() {
-    return _segmentDataManagers;
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
   }
 
   @BeforeClass
@@ -135,9 +133,9 @@ public class SerializedBytesQueriesTest extends 
BaseQueriesTest {
     FileUtils.deleteQuietly(INDEX_DIR);
 
     buildSegment();
-    _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), ReadMode.mmap);
-    _segmentDataManagers =
-        Arrays.asList(new ImmutableSegmentDataManager(_indexSegment), new 
ImmutableSegmentDataManager(_indexSegment));
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
   }
 
   private void buildSegment()
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
index da29a42..e8ab6d4 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
@@ -51,8 +51,6 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.response.broker.SelectionResults;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
@@ -63,6 +61,7 @@ import 
org.apache.pinot.core.operator.query.AggregationOperator;
 import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -84,33 +83,20 @@ import org.testng.annotations.Test;
  * The test table has a SKILLS column and QUERY_LOG column. Text index is 
created
  * on each of these columns.
  */
-public class TextSearchQueriesTest extends BaseQueriesTest {
-
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"TextSearchQueries");
+public class TextSearchQueriesTest extends BaseQueriesTest {  private static 
final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"TextSearchQueriesTest");
   private static final String TABLE_NAME = "MyTable";
-  private static final String SEGMENT_NAME = TABLE_NAME + 
"_100000000_200000000";
+  private static final String SEGMENT_NAME = "testSegment";
+
   private static final String QUERY_LOG_TEXT_COL_NAME = "QUERY_LOG_TEXT_COL";
   private static final String SKILLS_TEXT_COL_NAME = "SKILLS_TEXT_COL";
   private static final String INT_COL_NAME = "INT_COL";
-  private static final List<String> textIndexColumns = new ArrayList<>();
+  private static final List<String> TEXT_INDEX_COLUMNS = 
Arrays.asList(QUERY_LOG_TEXT_COL_NAME, SKILLS_TEXT_COL_NAME);
   private static final int INT_BASE_VALUE = 1000;
-  private List<GenericRow> _rows = new ArrayList<>();
-  private RecordReader _recordReader;
-  Schema _schema;
-  private TableConfig _tableConfig;
 
-  private List<IndexSegment> _indexSegments = new ArrayList<>(1);
-  private List<SegmentDataManager> _segmentDataManagers = new ArrayList<>();
+  private final List<GenericRow> _rows = new ArrayList<>();
 
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    createPinotTableSchema();
-    createTestData();
-    _recordReader = new GenericRowRecordReader(_rows);
-    createSegment();
-    loadSegment();
-  }
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
 
   @Override
   protected String getFilter() {
@@ -119,67 +105,66 @@ public class TextSearchQueriesTest extends 
BaseQueriesTest {
 
   @Override
   protected IndexSegment getIndexSegment() {
-    return _indexSegments.get(0);
+    return _indexSegment;
   }
 
   @Override
-  protected List<SegmentDataManager> getSegmentDataManagers() {
-    return _segmentDataManagers;
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    buildSegment();
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setTextIndexColumns(new HashSet<>(TEXT_INDEX_COLUMNS));
+    ImmutableSegment immutableSegment =
+        ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), 
indexLoadingConfig);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
   }
 
   @AfterClass
   public void tearDown() {
-    for (IndexSegment indexSegment : _indexSegments) {
-      if (indexSegment != null) {
-        indexSegment.destroy();
-      }
-    }
-    _indexSegments.clear();
+    _indexSegment.destroy();
     FileUtils.deleteQuietly(INDEX_DIR);
   }
 
-  private void createPinotTableSchema() {
-    _schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+  private void buildSegment()
+      throws Exception {
+    List<GenericRow> rows = createTestData();
+
+    List<FieldConfig> fieldConfigs = new 
ArrayList<>(TEXT_INDEX_COLUMNS.size());
+    for (String textIndexColumn : TEXT_INDEX_COLUMNS) {
+      fieldConfigs
+          .add(new FieldConfig(textIndexColumn, FieldConfig.EncodingType.RAW, 
FieldConfig.IndexType.TEXT, null));
+    }
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(TEXT_INDEX_COLUMNS)
+            .setFieldConfigList(fieldConfigs).build();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addSingleValueDimension(QUERY_LOG_TEXT_COL_NAME, 
FieldSpec.DataType.STRING)
         .addSingleValueDimension(SKILLS_TEXT_COL_NAME, 
FieldSpec.DataType.STRING)
         .addMetric(INT_COL_NAME, FieldSpec.DataType.INT).build();
-    _tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
-  }
-
-  private void createSegment()
-      throws Exception {
-    textIndexColumns.add(QUERY_LOG_TEXT_COL_NAME);
-    textIndexColumns.add(SKILLS_TEXT_COL_NAME);
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(_tableConfig, _schema);
-    segmentGeneratorConfig.setTableName(TABLE_NAME);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
-    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
-    segmentGeneratorConfig.setRawIndexCreationColumns(textIndexColumns);
-    segmentGeneratorConfig.setTextIndexCreationColumns(textIndexColumns);
-    segmentGeneratorConfig.setSkipTimeValueCheck(true);
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(SEGMENT_NAME);
 
     SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig, _recordReader);
-    driver.build();
-
-    File segmentIndexDir = new File(INDEX_DIR.getAbsolutePath(), SEGMENT_NAME);
-    if (!segmentIndexDir.exists()) {
-      throw new IllegalStateException("Segment generation failed");
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
     }
   }
 
-  private void loadSegment()
+  private List<GenericRow> createTestData()
       throws Exception {
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
-    indexLoadingConfig.setTextIndexColumns(new HashSet<>(textIndexColumns));
-    ImmutableSegment segment = ImmutableSegmentLoader.load(new File(INDEX_DIR, 
SEGMENT_NAME), indexLoadingConfig);
-    _indexSegments.add(segment);
-    _segmentDataManagers =
-        Arrays.asList(new ImmutableSegmentDataManager(segment), new 
ImmutableSegmentDataManager(segment));
-  }
+    List<GenericRow> rows = new ArrayList<>();
 
-  private void createTestData()
-      throws Exception {
     // read the skills file
     URL resourceUrl = 
getClass().getClassLoader().getResource("data/text_search_data/skills.txt");
     File skillFile = new File(resourceUrl.getFile());
@@ -209,10 +194,12 @@ public class TextSearchQueriesTest extends 
BaseQueriesTest {
         } else {
           row.putField(SKILLS_TEXT_COL_NAME, skills[counter]);
         }
-        _rows.add(row);
+        rows.add(row);
         counter++;
       }
     }
+
+    return rows;
   }
 
   /**
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
index 2344fe1..0b5c9f3 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
@@ -25,23 +25,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.TimeUtils;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
-import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -53,6 +42,13 @@ import 
org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
 import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -61,8 +57,14 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
 
 public class TransformQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"TransformQueriesTest");
+  private static final String TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
 
   private static final String D1 = "STRING_COL";
   private static final String M1 = "INT_COL1";
@@ -70,261 +72,184 @@ public class TransformQueriesTest extends BaseQueriesTest 
{
   private static final String M3 = "LONG_COL1";
   private static final String M4 = "LONG_COL2";
   private static final String TIME = "T";
-  private static final String TABLE_NAME = "FOO";
-  private static final String SEGMENT_NAME_1 = "FOO_SEGMENT_1";
-  private static final String SEGMENT_NAME_2 = "FOO_SEGMENT_2";
 
-  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"TransformQueriesTest");
+  private static final int NUM_ROWS = 10;
 
-  private Schema _schema;
-  private TableConfig _tableConfig;
-  private List<IndexSegment> _indexSegments = new ArrayList<>();
-  private List<SegmentDataManager> _segmentDataManagers;
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
 
-  @BeforeClass
-  public void setUp() {
-    _schema =
-        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
-            .addSingleValueDimension(M1, 
FieldSpec.DataType.INT).addSingleValueDimension(M2, FieldSpec.DataType.INT)
-            .addSingleValueDimension(M3, 
FieldSpec.DataType.LONG).addSingleValueDimension(M4, FieldSpec.DataType.LONG)
-            .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, TIME), null)
-            .build();
-    _tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME).build();
+  @Override
+  protected String getFilter() {
+    return "";
   }
 
-  @AfterClass
-  public void tearDown() {
-    FileUtils.deleteQuietly(INDEX_DIR);
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
   }
 
-  private void destroySegments() {
-    for (IndexSegment indexSegment : _indexSegments) {
-      if (indexSegment != null) {
-        indexSegment.destroy();
-      }
-    }
-    _indexSegments.clear();
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
   }
 
-  @Test
-  public void testTransformWithAvgInnerSegment()
+  @BeforeClass
+  public void setUp()
       throws Exception {
-    try {
-      final List<GenericRow> rows = createDataSet(10);
-      try (final RecordReader recordReader = new GenericRowRecordReader(rows)) 
{
-        createSegment(_tableConfig, _schema, recordReader, SEGMENT_NAME_1, 
TABLE_NAME);
-        final ImmutableSegment segment = loadSegment(SEGMENT_NAME_1);
-        _indexSegments.add(segment);
-
-        String query = "SELECT AVG(SUB(INT_COL1, INT_COL2)) FROM foo";
-        runAndVerifyInnerSegmentQuery(query, -10000.00, 10);
-
-        query = "SELECT AVG(SUB(LONG_COL1, INT_COL1)) FROM foo";
-        runAndVerifyInnerSegmentQuery(query, 4990000.00, 10);
-
-        query = "SELECT AVG(SUB(LONG_COL2, LONG_COL1)) FROM foo";
-        runAndVerifyInnerSegmentQuery(query, 5000000.00, 10);
-
-        query = "SELECT AVG(ADD(INT_COL1, INT_COL2)) FROM foo";
-        runAndVerifyInnerSegmentQuery(query, 30000.00, 10);
-
-        query = "SELECT AVG(ADD(INT_COL1, LONG_COL1)) FROM foo";
-        runAndVerifyInnerSegmentQuery(query, 5010000.00, 10);
-
-        query = "SELECT AVG(ADD(LONG_COL1, LONG_COL2)) FROM foo";
-        runAndVerifyInnerSegmentQuery(query, 15000000.00, 10);
-
-        query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1, 
LONG_COL2))) FROM foo";
-        runAndVerifyInnerSegmentQuery(query, 10.00, 10);
-
-        try {
-          query = "SELECT AVG(SUB(INT_COL1, STRING_COL)) FROM foo";
-          runAndVerifyInnerSegmentQuery(query, -10000.00, 10);
-          Assert.fail("Query should have failed");
-        } catch (Exception e) {
-        }
-
-        try {
-          query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1, 
STRING_COL))) FROM foo";
-          runAndVerifyInnerSegmentQuery(query, 10.00, 10);
-          Assert.fail("Query should have failed");
-        } catch (Exception e) {
-        }
-      }
-    } finally {
-      destroySegments();
-    }
+    FileUtils.deleteQuietly(INDEX_DIR);
+
+    buildSegment();
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new 
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
   }
 
-  @Test
-  public void testTransformWithDateTruncInnerSegment()
+  protected void buildSegment()
       throws Exception {
-    Object[] columns = new Object[]{"Pinot", 1000, 2000, 500000, 1000000};
-    long start = new DateTime(1973, 1, 8, 14, 6, 4, 3, 
DateTimeZone.UTC).getMillis();
-
-    List<GenericRow> rows = new ArrayList<>();
-    for (int i = 0; i < 7; i++) {
-      GenericRow row = new GenericRow();
-      row.putField(D1, columns[0]);
-      row.putField(M1, columns[1]);
-      row.putField(M2, columns[2]);
-      row.putField(M3, columns[3]);
-      row.putField(M4, columns[4]);
-      row.putField(TIME, ThreadLocalRandom.current().nextLong(start, start + 
5000));
+    GenericRow row = new GenericRow();
+    row.putValue(D1, "Pinot");
+    row.putValue(M1, 1000);
+    row.putValue(M2, 2000);
+    row.putValue(M3, 500000);
+    row.putValue(M4, 1000000);
+    row.putValue(TIME, new DateTime(1973, 1, 8, 14, 6, 4, 3, 
DateTimeZone.UTC).getMillis());
+
+    List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+    for (int i = 0; i < NUM_ROWS; i++) {
       rows.add(row);
     }
 
-    try (final RecordReader recordReader = new GenericRowRecordReader(rows)) {
-      createSegment(_tableConfig, _schema, recordReader, SEGMENT_NAME_1, 
TABLE_NAME);
-      final ImmutableSegment segment = loadSegment(SEGMENT_NAME_1);
-      _indexSegments.add(segment);
-
-      String query =
-          "SELECT COUNT(*) FROM foo GROUP BY DATETRUNC('week', ADD(SUB(DIV(T, 
1000), INT_COL2), INT_COL2), 'SECONDS', 'Europe/Berlin')";
-      verifyDateTruncationResult(query, rows.size(), "95295600");
-
-      query =
-          "SELECT COUNT(*) FROM foo GROUP BY DATETRUNC('week', 
DIV(MULT(DIV(ADD(SUB(T, 5), 5), 1000), INT_COL2), INT_COL2), 'SECONDS', 
'Europe/Berlin', 'MILLISECONDS')";
-      verifyDateTruncationResult(query, rows.size(), "95295600000");
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME).build();
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
+            .addSingleValueDimension(M1, 
FieldSpec.DataType.INT).addSingleValueDimension(M2, FieldSpec.DataType.INT)
+            .addSingleValueDimension(M3, 
FieldSpec.DataType.LONG).addSingleValueDimension(M4, FieldSpec.DataType.LONG)
+            .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, TIME), null).build();
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, 
schema);
+    config.setOutDir(INDEX_DIR.getPath());
+    config.setTableName(TABLE_NAME);
+    config.setSegmentName(SEGMENT_NAME);
 
-      query = "SELECT COUNT(*) FROM foo GROUP BY DATETRUNC('quarter', T, 
'MILLISECONDS')";
-      verifyDateTruncationResult(query, rows.size(), "94694400000");
-    } finally {
-      destroySegments();
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+      driver.init(config, recordReader);
+      driver.build();
     }
   }
 
-  private void verifyDateTruncationResult(String query, long countResult, 
String stringKey) {
-    AggregationGroupByOperator aggregtionGroupByOperator = 
getOperatorForQuery(query);
-    IntermediateResultsBlock resultsBlock = 
aggregtionGroupByOperator.nextBlock();
-    final AggregationGroupByResult aggregationGroupByResult = 
resultsBlock.getAggregationGroupByResult();
-    List<GroupKeyGenerator.GroupKey> groupKeys = 
ImmutableList.copyOf(aggregationGroupByResult.getGroupKeyIterator());
-    Assert.assertEquals(groupKeys.size(), 1);
-    Assert.assertEquals(groupKeys.get(0)._stringKey, stringKey);
-    Object resultForKey = 
aggregationGroupByResult.getResultForKey(groupKeys.get(0), 0);
-    Assert.assertEquals(resultForKey, countResult);
+  @AfterClass
+  public void tearDown() {
+    _indexSegment.destroy();
+    FileUtils.deleteQuietly(INDEX_DIR);
   }
 
   @Test
-  public void testTransformWithAvgInterSegmentInterServer()
-      throws Exception {
-    try {
-      final List<GenericRow> segmentOneRows = createDataSet(10);
-      final List<GenericRow> segmentTwoRows = createDataSet(10);
-
-      try (final RecordReader recordReaderOne = new 
GenericRowRecordReader(segmentOneRows);
-          final RecordReader recordReaderTwo = new 
GenericRowRecordReader(segmentTwoRows)) {
-        createSegment(_tableConfig, _schema, recordReaderOne, SEGMENT_NAME_1, 
TABLE_NAME);
-        createSegment(_tableConfig, _schema, recordReaderTwo, SEGMENT_NAME_2, 
TABLE_NAME);
-
-        final ImmutableSegment segmentOne = loadSegment(SEGMENT_NAME_1);
-        final ImmutableSegment segmentTwo = loadSegment(SEGMENT_NAME_2);
-
-        _indexSegments.add(segmentOne);
-        _indexSegments.add(segmentTwo);
+  public void testTransformWithAvgInnerSegment() {
+    String query = "SELECT AVG(SUB(INT_COL1, INT_COL2)) FROM testTable";
+    runAndVerifyInnerSegmentQuery(query, -10000.0, 10);
 
-        _segmentDataManagers =
-            Arrays.asList(new ImmutableSegmentDataManager(segmentOne), new 
ImmutableSegmentDataManager(segmentTwo));
+    query = "SELECT AVG(SUB(LONG_COL1, INT_COL1)) FROM testTable";
+    runAndVerifyInnerSegmentQuery(query, 4990000.0, 10);
 
-        String query = "SELECT AVG(SUB(INT_COL1, INT_COL2)) FROM foo";
-        runAndVerifyInterSegmentQuery(query, "-1000.00000");
+    query = "SELECT AVG(SUB(LONG_COL2, LONG_COL1)) FROM testTable";
+    runAndVerifyInnerSegmentQuery(query, 5000000.0, 10);
 
-        query = "SELECT AVG(SUB(LONG_COL1, INT_COL1)) FROM foo";
-        runAndVerifyInterSegmentQuery(query, "499000.00000");
+    query = "SELECT AVG(ADD(INT_COL1, INT_COL2)) FROM testTable";
+    runAndVerifyInnerSegmentQuery(query, 30000.0, 10);
 
-        query = "SELECT AVG(SUB(LONG_COL2, LONG_COL1)) FROM foo";
-        runAndVerifyInterSegmentQuery(query, "500000.00000");
+    query = "SELECT AVG(ADD(INT_COL1, LONG_COL1)) FROM testTable";
+    runAndVerifyInnerSegmentQuery(query, 5010000.0, 10);
 
-        query = "SELECT AVG(ADD(INT_COL1, INT_COL2)) FROM foo";
-        runAndVerifyInterSegmentQuery(query, "3000.00000");
+    query = "SELECT AVG(ADD(LONG_COL1, LONG_COL2)) FROM testTable";
+    runAndVerifyInnerSegmentQuery(query, 15000000.0, 10);
 
-        query = "SELECT AVG(ADD(INT_COL1, LONG_COL1)) FROM foo";
-        runAndVerifyInterSegmentQuery(query, "501000.00000");
+    query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1, 
LONG_COL2))) FROM testTable";
+    runAndVerifyInnerSegmentQuery(query, 10.0, 10);
 
-        query = "SELECT AVG(ADD(LONG_COL1, LONG_COL2)) FROM foo";
-        runAndVerifyInterSegmentQuery(query, "1500000.00000");
-
-        query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1, 
LONG_COL2))) FROM foo";
-        runAndVerifyInterSegmentQuery(query, "1.00000");
-      }
-    } finally {
-      destroySegments();
+    try {
+      query = "SELECT AVG(SUB(INT_COL1, STRING_COL)) FROM testTable";
+      runAndVerifyInnerSegmentQuery(query, -10000.0, 10);
+      Assert.fail("Query should have failed");
+    } catch (Exception e) {
+      // Expected
     }
-  }
 
-  private List<GenericRow> createDataSet(final int numRows) {
-    final ThreadLocalRandom random = ThreadLocalRandom.current();
-    final List<GenericRow> rows = new ArrayList<>(numRows);
-    Object[] columns;
-
-    // ROW
-    GenericRow row = new GenericRow();
-    columns = new Object[]{"Pinot", 1000, 2000, 500000, 1000000};
-    row.putField(D1, columns[0]);
-    row.putField(M1, columns[1]);
-    row.putField(M2, columns[2]);
-    row.putField(M3, columns[3]);
-    row.putField(M4, columns[4]);
-    row.putField(TIME, random.nextLong(TimeUtils.getValidMinTimeMillis(), 
TimeUtils.getValidMaxTimeMillis()));
-
-    for (int i = 0; i < numRows; i++) {
-      rows.add(row);
+    try {
+      query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1, 
STRING_COL))) FROM testTable";
+      runAndVerifyInnerSegmentQuery(query, 10.00, 10);
+      Assert.fail("Query should have failed");
+    } catch (Exception e) {
+      // Expected
     }
-    return rows;
   }
 
-  private void runAndVerifyInnerSegmentQuery(String query, double sum, long 
count) {
+  private void runAndVerifyInnerSegmentQuery(String query, double expectedSum, 
int expectedCount) {
     AggregationOperator aggregationOperator = getOperatorForQuery(query);
     IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
-    final List<Object> aggregationResult = resultsBlock.getAggregationResult();
+    List<Object> aggregationResult = resultsBlock.getAggregationResult();
+    assertNotNull(aggregationResult);
+    assertEquals(aggregationResult.size(), 1);
     AvgPair avgPair = (AvgPair) aggregationResult.get(0);
-    Assert.assertEquals(avgPair.getSum(), sum);
-    Assert.assertEquals(avgPair.getCount(), count);
+    assertEquals(avgPair.getSum(), expectedSum);
+    assertEquals(avgPair.getCount(), expectedCount);
   }
 
-  private void runAndVerifyInterSegmentQuery(String query, String serialized) {
-    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
-    List<AggregationResult> aggregationResults = 
brokerResponse.getAggregationResults();
-    Assert.assertEquals(aggregationResults.size(), 1);
-    Serializable value = aggregationResults.get(0).getValue();
-    Assert.assertEquals(value.toString(), serialized);
-  }
+  @Test
+  public void testTransformWithDateTruncInnerSegment() {
+    String query =
+        "SELECT COUNT(*) FROM testTable GROUP BY DATETRUNC('week', 
ADD(SUB(DIV(T, 1000), INT_COL2), INT_COL2), 'SECONDS', 'Europe/Berlin')";
+    verifyDateTruncationResult(query, "95295600");
 
-  @Override
-  protected String getFilter() {
-    return "";
-  }
+    query =
+        "SELECT COUNT(*) FROM testTable GROUP BY DATETRUNC('week', 
DIV(MULT(DIV(ADD(SUB(T, 5), 5), 1000), INT_COL2), INT_COL2), 'SECONDS', 
'Europe/Berlin', 'MILLISECONDS')";
+    verifyDateTruncationResult(query, "95295600000");
 
-  @Override
-  protected IndexSegment getIndexSegment() {
-    return _indexSegments.get(0);
+    query = "SELECT COUNT(*) FROM testTable GROUP BY DATETRUNC('quarter', T, 
'MILLISECONDS')";
+    verifyDateTruncationResult(query, "94694400000");
   }
 
-  @Override
-  protected List<SegmentDataManager> getSegmentDataManagers() {
-    return _segmentDataManagers;
+  private void verifyDateTruncationResult(String query, String 
expectedStringKey) {
+    AggregationGroupByOperator aggregationGroupByOperator = 
getOperatorForQuery(query);
+    IntermediateResultsBlock resultsBlock = 
aggregationGroupByOperator.nextBlock();
+    AggregationGroupByResult aggregationGroupByResult = 
resultsBlock.getAggregationGroupByResult();
+    assertNotNull(aggregationGroupByResult);
+    List<GroupKeyGenerator.GroupKey> groupKeys = 
ImmutableList.copyOf(aggregationGroupByResult.getGroupKeyIterator());
+    assertEquals(groupKeys.size(), 1);
+    assertEquals(groupKeys.get(0)._stringKey, expectedStringKey);
+    Object resultForKey = 
aggregationGroupByResult.getResultForKey(groupKeys.get(0), 0);
+    assertEquals(resultForKey, (long) NUM_ROWS);
   }
 
-  private void createSegment(TableConfig tableConfig, Schema schema, 
RecordReader recordReader, String segmentName,
-      String tableName)
-      throws Exception {
-    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
-    segmentGeneratorConfig.setTableName(tableName);
-    segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
-    segmentGeneratorConfig.setSegmentName(segmentName);
+  @Test
+  public void testTransformWithAvgInterSegmentInterServer() {
+    String query = "SELECT AVG(SUB(INT_COL1, INT_COL2)) FROM testTable";
+    runAndVerifyInterSegmentQuery(query, "-1000.00000");
 
-    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
-    driver.init(segmentGeneratorConfig, recordReader);
-    driver.build();
+    query = "SELECT AVG(SUB(LONG_COL1, INT_COL1)) FROM testTable";
+    runAndVerifyInterSegmentQuery(query, "499000.00000");
 
-    File segmentIndexDir = new File(INDEX_DIR.getAbsolutePath(), segmentName);
-    if (!segmentIndexDir.exists()) {
-      throw new IllegalStateException("Segment generation failed");
-    }
+    query = "SELECT AVG(SUB(LONG_COL2, LONG_COL1)) FROM testTable";
+    runAndVerifyInterSegmentQuery(query, "500000.00000");
+
+    query = "SELECT AVG(ADD(INT_COL1, INT_COL2)) FROM testTable";
+    runAndVerifyInterSegmentQuery(query, "3000.00000");
+
+    query = "SELECT AVG(ADD(INT_COL1, LONG_COL1)) FROM testTable";
+    runAndVerifyInterSegmentQuery(query, "501000.00000");
+
+    query = "SELECT AVG(ADD(LONG_COL1, LONG_COL2)) FROM testTable";
+    runAndVerifyInterSegmentQuery(query, "1500000.00000");
+
+    query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1, 
LONG_COL2))) FROM testTable";
+    runAndVerifyInterSegmentQuery(query, "1.00000");
   }
 
-  private ImmutableSegment loadSegment(String segmentName)
-      throws Exception {
-    return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), 
ReadMode.heap);
+  private void runAndVerifyInterSegmentQuery(String query, String 
expectedValue) {
+    BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+    List<AggregationResult> aggregationResults = 
brokerResponse.getAggregationResults();
+    assertEquals(aggregationResults.size(), 1);
+    Serializable value = aggregationResults.get(0).getValue();
+    assertEquals(value, expectedValue);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to