This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 54bea86f89c Migrate GroupByOptions, GroupByTrimming, and RowExpression 
integration tests to custom package (#17857)
54bea86f89c is described below

commit 54bea86f89c082981f839622fc70b211aa073a5f
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Mar 12 01:24:10 2026 -0700

    Migrate GroupByOptions, GroupByTrimming, and RowExpression integration 
tests to custom package (#17857)
    
    Move these tests to share a single Pinot cluster via 
CustomDataQueryClusterIntegrationTest
    (@BeforeSuite/@AfterSuite lifecycle) instead of each test starting its own 
cluster. This
    reduces total integration test setup time by eliminating 3 redundant 
cluster startups.
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../AggregateMetricsClusterIntegrationTest.java    | 125 -------------
 .../GroupByEnableTrimOptionIntegrationTest.java    |   9 +-
 .../StarTreeFunctionParametersIntegrationTest.java |   5 +-
 .../tests/custom/AggregateMetricsTest.java         | 133 +++++++++++++
 .../BigNumberOfSegmentsTest.java}                  | 129 ++++---------
 .../CLPEncodingRealtimeTest.java}                  | 128 +++++--------
 .../GroupByOptionsTest.java}                       | 129 ++++++-------
 .../GroupByTrimmingTest.java}                      | 173 ++++++++---------
 .../tests/custom/MultiColumnTextIndicesTest.java   |   2 +-
 .../RowExpressionTest.java}                        | 163 +++++++---------
 .../StarTreeTest.java}                             | 205 +++++++++++----------
 11 files changed, 542 insertions(+), 659 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
deleted file mode 100644
index b7ba0103ab3..00000000000
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-/**
- * Integration test that enables aggregate metrics for the LLC real-time table.
- */
-public class AggregateMetricsClusterIntegrationTest extends 
BaseClusterIntegrationTestSet {
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
-
-    // Start the Pinot cluster
-    startZk();
-    startController();
-    startBroker();
-    startServer();
-
-    // Start Kafka
-    startKafka();
-
-    // Unpack the Avro files
-    List<File> avroFiles = unpackAvroData(_tempDir);
-
-    // Create and upload the schema and table config with reduced number of 
columns and aggregate metrics on
-    Schema schema =
-        new 
Schema.SchemaBuilder().setSchemaName(getTableName()).addSingleValueDimension("Carrier",
 DataType.STRING)
-            .addSingleValueDimension("Origin", 
DataType.STRING).addMetric("AirTime", DataType.LONG)
-            .addMetric("ArrDelay", DataType.DOUBLE)
-            .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", 
"1:DAYS").build();
-    addSchema(schema);
-    TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
-    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
-    indexingConfig.setSortedColumn(Collections.singletonList("Carrier"));
-    
indexingConfig.setInvertedIndexColumns(Collections.singletonList("Origin"));
-    indexingConfig.setNoDictionaryColumns(Arrays.asList("AirTime", 
"ArrDelay"));
-    
indexingConfig.setRangeIndexColumns(Collections.singletonList("DaysSinceEpoch"));
-    indexingConfig.setBloomFilterColumns(Collections.singletonList("Origin"));
-    indexingConfig.setAggregateMetrics(true);
-    addTableConfig(tableConfig);
-
-    // Push data into Kafka
-    pushAvroIntoKafka(avroFiles);
-
-    // Set up the H2 connection
-    setUpH2Connection(avroFiles);
-
-    // Wait for all documents loaded
-    waitForAllDocsLoaded(600_000L);
-  }
-
-  @Override
-  protected void waitForAllDocsLoaded(long timeoutMs) {
-    // NOTE: For aggregate metrics, we need to test the aggregation result 
instead of the document count because
-    //       documents can be merged during ingestion.
-    String sql = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable";
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        JsonNode queryResult = postQuery(sql);
-        JsonNode aggregationResults = 
queryResult.get("resultTable").get("rows").get(0);
-        return aggregationResults.get(0).asInt() == -165429728 && 
aggregationResults.get(1).asInt() == -175625957;
-      } catch (Exception e) {
-        return null;
-      }
-    }, 100L, timeoutMs, "Failed to load all documents");
-  }
-
-  @Test(dataProvider = "useBothQueryEngines")
-  public void testQueries(boolean useMultiStageQueryEngine)
-      throws Exception {
-    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
-    String query = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable";
-    testQuery(query);
-    query = "SELECT SUM(AirTime), DaysSinceEpoch FROM mytable GROUP BY 
DaysSinceEpoch ORDER BY SUM(AirTime) DESC";
-    testQuery(query);
-    query = "SELECT Origin, SUM(ArrDelay) FROM mytable WHERE Carrier = 'AA' 
GROUP BY Origin ORDER BY Origin";
-    testQuery(query);
-  }
-
-  @AfterClass
-  public void tearDown()
-      throws Exception {
-    dropRealtimeTable(getTableName());
-    stopServer();
-    stopBroker();
-    stopController();
-    stopKafka();
-    stopZk();
-    FileUtils.deleteDirectory(_tempDir);
-  }
-}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
index 4f534e458c0..98027bc7d66 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.commons.io.FileUtils;
+import org.apache.pinot.integration.tests.custom.GroupByOptionsTest;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -41,7 +42,7 @@ import org.testng.annotations.Test;
 import static 
org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl;
 
 
-// similar to GroupByOptionsIntegrationTest but this test verifies that 
default enable group trim option works even
+// similar to GroupByOptionsTest but this test verifies that default enable 
group trim option works even
 // if hint is not set in the query
 public class GroupByEnableTrimOptionIntegrationTest extends 
BaseClusterIntegrationTestSet {
 
@@ -69,7 +70,7 @@ public class GroupByEnableTrimOptionIntegrationTest extends 
BaseClusterIntegrati
     TableConfig tableConfig = createOfflineTableConfig();
     addTableConfig(tableConfig);
 
-    List<File> avroFiles = 
GroupByOptionsIntegrationTest.createAvroFile(_tempDir);
+    List<File> avroFiles = GroupByOptionsTest.createAvroFile(_tempDir);
     ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
     uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
 
@@ -206,8 +207,8 @@ public class GroupByEnableTrimOptionIntegrationTest extends 
BaseClusterIntegrati
     JsonNode result = postV2Query(sql);
     JsonNode plan = postV2Query(option + " set explainAskingServers=true; 
explain plan for " + query);
 
-    Assert.assertEquals(GroupByOptionsIntegrationTest.toResultStr(result), 
expectedResult);
-    Assert.assertEquals(GroupByOptionsIntegrationTest.toExplainStr(plan, 
true), expectedPlan);
+    Assert.assertEquals(GroupByOptionsTest.toResultStr(result), 
expectedResult);
+    Assert.assertEquals(GroupByOptionsTest.toExplainStr(plan, true), 
expectedPlan);
   }
 
   private JsonNode postV2Query(String query)
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeFunctionParametersIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeFunctionParametersIntegrationTest.java
index 9bf9da4e18c..097255cf821 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeFunctionParametersIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeFunctionParametersIntegrationTest.java
@@ -28,6 +28,7 @@ import java.util.function.Function;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.pinot.integration.tests.custom.StarTreeTest;
 import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
@@ -226,7 +227,7 @@ public class StarTreeFunctionParametersIntegrationTest 
extends BaseClusterIntegr
    */
   private void checkQueryDoesNotUseStarTreeIndex(String query, int 
expectedResult) throws Exception {
     JsonNode explainPlan = postQuery("EXPLAIN PLAN FOR " + query);
-    
assertFalse(explainPlan.toString().contains(StarTreeClusterIntegrationTest.FILTER_STARTREE_INDEX));
+    
assertFalse(explainPlan.toString().contains(StarTreeTest.FILTER_STARTREE_INDEX));
     assertEquals(getDistinctCountResult(query), expectedResult);
   }
 
@@ -244,7 +245,7 @@ public class StarTreeFunctionParametersIntegrationTest 
extends BaseClusterIntegr
           } catch (Exception e) {
             throw new RuntimeException(e);
           }
-          return 
result.toString().contains(StarTreeClusterIntegrationTest.FILTER_STARTREE_INDEX);
+          return 
result.toString().contains(StarTreeTest.FILTER_STARTREE_INDEX);
         }, 1000L, 10_000L, "Failed to use star-tree index for query: " + query
     );
     assertEquals(getDistinctCountResult(query), expectedResult);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/AggregateMetricsTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/AggregateMetricsTest.java
new file mode 100644
index 00000000000..d35e4f40e69
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/AggregateMetricsTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that enables aggregate metrics for the LLC real-time table.
+ */
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class AggregateMetricsTest extends 
CustomDataQueryClusterIntegrationTest {
+
+  private static final long EXPECTED_SUM_AIR_TIME = -165429728L;
+  private static final long EXPECTED_SUM_ARR_DELAY = -175625957L;
+
+  @Override
+  public String getTableName() {
+    return "AggregateMetricsTest";
+  }
+
+  @Override
+  public boolean isRealtimeTable() {
+    return true;
+  }
+
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+        .addSingleValueDimension("Carrier", DataType.STRING)
+        .addSingleValueDimension("Origin", DataType.STRING)
+        .addMetric("AirTime", DataType.LONG)
+        .addMetric("ArrDelay", DataType.DOUBLE)
+        .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS")
+        .build();
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    return unpackAvroData(_tempDir);
+  }
+
+  @Override
+  protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
+    TableConfig tableConfig = super.createRealtimeTableConfig(sampleAvroFile);
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    indexingConfig.setSortedColumn(Collections.singletonList("Carrier"));
+    
indexingConfig.setInvertedIndexColumns(Collections.singletonList("Origin"));
+    indexingConfig.setNoDictionaryColumns(Arrays.asList("AirTime", 
"ArrDelay"));
+    
indexingConfig.setRangeIndexColumns(Collections.singletonList("DaysSinceEpoch"));
+    indexingConfig.setBloomFilterColumns(Collections.singletonList("Origin"));
+    indexingConfig.setAggregateMetrics(true);
+    return tableConfig;
+  }
+
+  @Nullable
+  @Override
+  protected String getSortedColumn() {
+    return null;
+  }
+
+  @Override
+  public String getTimeColumnName() {
+    return "DaysSinceEpoch";
+  }
+
+  @Override
+  protected void waitForAllDocsLoaded(long timeoutMs) {
+    // For aggregate metrics, documents can be merged during ingestion, so we 
check aggregation results
+    // instead of document count.
+    String sql = "SELECT SUM(AirTime), SUM(ArrDelay) FROM " + getTableName();
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode queryResult = postQuery(sql);
+        JsonNode aggregationResults = 
queryResult.get("resultTable").get("rows").get(0);
+        return aggregationResults.get(0).asLong() == EXPECTED_SUM_AIR_TIME
+            && aggregationResults.get(1).asLong() == EXPECTED_SUM_ARR_DELAY;
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, timeoutMs, "Failed to load all documents");
+  }
+
+  @Test
+  public void testAggregateMetricsQueries()
+      throws Exception {
+    // Test total aggregation
+    JsonNode result = postQuery("SELECT SUM(AirTime), SUM(ArrDelay) FROM " + 
getTableName());
+    JsonNode rows = result.get("resultTable").get("rows").get(0);
+    assertEquals(rows.get(0).asLong(), EXPECTED_SUM_AIR_TIME);
+    assertEquals(rows.get(1).asLong(), EXPECTED_SUM_ARR_DELAY);
+
+    // Test group by with order
+    result = postQuery("SELECT SUM(AirTime), DaysSinceEpoch FROM " + 
getTableName()
+        + " GROUP BY DaysSinceEpoch ORDER BY SUM(AirTime) DESC LIMIT 1");
+    assertEquals(result.get("exceptions").size(), 0);
+
+    // Test filter with group by
+    result = postQuery("SELECT Origin, SUM(ArrDelay) FROM " + getTableName()
+        + " WHERE Carrier = 'AA' GROUP BY Origin ORDER BY Origin LIMIT 10");
+    assertEquals(result.get("exceptions").size(), 0);
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BigNumberOfSegmentsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BigNumberOfSegmentsTest.java
similarity index 61%
rename from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BigNumberOfSegmentsIntegrationTest.java
rename to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BigNumberOfSegmentsTest.java
index aaf8d5ec009..426925545ba 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BigNumberOfSegmentsIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BigNumberOfSegmentsTest.java
@@ -16,12 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.io.IOException;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -30,16 +29,12 @@ import org.apache.avro.Schema.Type;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl;
@@ -50,7 +45,8 @@ import static 
org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.get
  * Test is rather slow (1 minute+) and thus disabled by default.
  * Still, it can be useful to run it manually.
  */
-public class BigNumberOfSegmentsIntegrationTest extends 
BaseClusterIntegrationTestSet {
+@Test(suiteName = "CustomClusterIntegrationTest", enabled = false)
+public class BigNumberOfSegmentsTest extends 
CustomDataQueryClusterIntegrationTest {
 
   static final int FILES_NO = 1000;
   static final int RECORDS_NO = 5;
@@ -60,88 +56,52 @@ public class BigNumberOfSegmentsIntegrationTest extends 
BaseClusterIntegrationTe
   static final String FLOAT_COL = "f";
   static final String DOUBLE_COL = "d";
   static final int STR_COL_NUM = 200;
+  private static final String TIME_COL = "ts";
 
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    System.out.println("Pid is " + ProcessHandle.current().pid());
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
-    startZk();
-    startKafka();
-    startController();
-    startServer();
-    startBroker();
+  @Override
+  public String getTableName() {
+    return "BigNumberOfSegmentsTest";
+  }
 
+  @Override
+  public Schema createSchema() {
     Schema.SchemaBuilder builder = new Schema.SchemaBuilder()
-        .setSchemaName(DEFAULT_SCHEMA_NAME)
+        .setSchemaName(getTableName())
         .addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
         .addSingleValueDimension(LONG_COL, FieldSpec.DataType.LONG)
         .addSingleValueDimension(FLOAT_COL, FieldSpec.DataType.FLOAT)
         .addSingleValueDimension(DOUBLE_COL, FieldSpec.DataType.DOUBLE)
-        .addDateTimeField(DEFAULT_TIME_COLUMN_NAME, 
FieldSpec.DataType.TIMESTAMP, "TIMESTAMP", "1:MILLISECONDS");
+        .addDateTimeField(TIME_COL, FieldSpec.DataType.TIMESTAMP, "TIMESTAMP", 
"1:MILLISECONDS");
 
     for (int i = 0; i < STR_COL_NUM; i++) {
       builder.addSingleValueDimension(STR_COL_PREFIX + i, 
FieldSpec.DataType.STRING);
     }
 
-    Schema schema = builder.build();
-
-    List<File> avroFiles = createAvroFiles(_tempDir);
-
-    addSchema(schema);
-    TableConfig tableConfig = createOfflineTableConfig(schema);
-    addTableConfig(tableConfig);
-
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
-    uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
-
-    // Wait for all documents loaded
-    TestUtils.waitForCondition(() -> 
getCurrentCountStarResult(DEFAULT_TABLE_NAME) == FILES_NO * RECORDS_NO, 100L,
-        600_000,
-        "Failed to load  documents", true, Duration.ofMillis(60_000 / 10));
-
-    setUseMultiStageQueryEngine(true);
+    return builder.build();
   }
 
-  // to slow for CI (1+ minute)
-  @Test(enabled = false)
-  public void testCreateManySegments()
+  @Override
+  public List<File> createAvroFiles()
       throws Exception {
-    JsonNode node = postV2Query("select sum(i) + sum(j) + sum(d), count(*) 
from " + DEFAULT_TABLE_NAME);
-
-    assertNoError(node);
+    return createAvroData(_tempDir);
   }
 
-  protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
-    AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
-    return new TableConfigBuilder(TableType.REALTIME)
-        .setTableName(getTableName())
-        .setTimeColumnName(getTimeColumnName())
-        .setInvertedIndexColumns(getInvertedIndexColumns())
-        .setRangeIndexColumns(getRangeIndexColumns())
-        .setFieldConfigList(getFieldConfigs())
-        .setNumReplicas(getNumReplicas())
-        .setSegmentVersion(getSegmentVersion())
-        .setLoadMode(getLoadMode())
-        .setTaskConfig(getTaskConfig())
-        .setBrokerTenant(getBrokerTenant())
-        .setServerTenant(getServerTenant())
-        .setIngestionConfig(getIngestionConfig())
-        .setQueryConfig(getQueryConfig())
-        .setStreamConfigs(getStreamConfigs())
-        .setNullHandlingEnabled(getNullHandlingEnabled())
-        .build();
+  @Override
+  public int getNumAvroFiles() {
+    return FILES_NO;
   }
 
-  protected int getRealtimeSegmentFlushSize() {
-    return RECORDS_NO;
+  @Override
+  protected long getCountStarResult() {
+    return (long) FILES_NO * RECORDS_NO;
   }
 
-  TableConfig createOfflineTableConfig(Schema schema) {
+  @Override
+  public TableConfig createOfflineTableConfig() {
+    Schema schema = createSchema();
     return new TableConfigBuilder(TableType.OFFLINE)
         .setTableName(getTableName())
-        .setTimeColumnName(DEFAULT_TIME_COLUMN_NAME)
+        .setTimeColumnName(TIME_COL)
         .setNumReplicas(getNumReplicas())
         .setBrokerTenant(getBrokerTenant())
         .setRetentionTimeUnit("DAYS")
@@ -150,37 +110,20 @@ public class BigNumberOfSegmentsIntegrationTest extends 
BaseClusterIntegrationTe
         .build();
   }
 
-  @Override
-  protected List<String> getInvertedIndexColumns() {
-    return Arrays.asList(FLOAT_COL, INT_COL);
-  }
-
-  @Override
-  protected List<String> getRangeIndexColumns() {
-    return Arrays.asList(INT_COL);
-  }
-
-  private JsonNode postV2Query(String query)
+  // Too slow for CI (1+ minute)
+  @Test(enabled = false)
+  public void testCreateManySegments()
       throws Exception {
-    return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(), true), 
null,
-        getExtraQueryProperties());
+    JsonNode node = postQuery("SELECT sum(i) + sum(j) + sum(d), count(*) FROM 
" + getTableName(),
+        getBrokerQueryApiUrl(getBrokerBaseApiUrl(), true), null, 
getExtraQueryProperties());
+    assertEquals(node.get("exceptions").size(), 0);
   }
 
-  @AfterClass
-  public void tearDown()
-      throws Exception {
-    dropOfflineTable(DEFAULT_TABLE_NAME);
-
-    stopServer();
-    stopBroker();
-    stopController();
-    stopKafka();
-    stopZk();
-
-    FileUtils.deleteDirectory(_tempDir);
+  private static void assertEquals(int actual, int expected) {
+    org.testng.Assert.assertEquals(actual, expected);
   }
 
-  private static List<File> createAvroFiles(File tempDir)
+  private static List<File> createAvroData(File tempDir)
       throws IOException {
 
     // create avro schema
@@ -190,7 +133,7 @@ public class BigNumberOfSegmentsIntegrationTest extends 
BaseClusterIntegrationTe
     fields.add(new Field(LONG_COL, create(Type.LONG), null, null));
     fields.add(new Field(FLOAT_COL, create(Type.FLOAT), null, null));
     fields.add(new Field(DOUBLE_COL, create(Type.DOUBLE), null, null));
-    fields.add(new Field(DEFAULT_TIME_COLUMN_NAME, create(Type.LONG), null, 
null));
+    fields.add(new Field(TIME_COL, create(Type.LONG), null, null));
     for (int i = 0; i < STR_COL_NUM; i++) {
       fields.add(new Field(STR_COL_PREFIX + i, create(Type.STRING), null, 
null));
     }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CLPEncodingRealtimeTest.java
similarity index 61%
rename from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
rename to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CLPEncodingRealtimeTest.java
index e9f409bfe17..eec8b128045 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CLPEncodingRealtimeTest.java
@@ -16,136 +16,94 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
 
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
-public class CLPEncodingRealtimeIntegrationTest extends 
BaseClusterIntegrationTestSet {
-  private List<File> _avroFiles;
-  private FieldConfig.CompressionCodec _selectedCompressionCodec;
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class CLPEncodingRealtimeTest extends 
CustomDataQueryClusterIntegrationTest {
 
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-    _avroFiles = unpackAvroData(_tempDir);
-
-    // Randomly select CLP or CLPV2 compression codec
-    _selectedCompressionCodec =
-        RANDOM.nextBoolean() ? FieldConfig.CompressionCodec.CLP : 
FieldConfig.CompressionCodec.CLPV2;
-
-    // Start the Pinot cluster
-    startZk();
-    startKafka();
-    // Start a customized controller with more frequent realtime segment 
validation
-    startController();
-    startBroker();
-    startServer();
-    pushAvroIntoKafka(_avroFiles);
-
-    Schema schema = createSchema();
-    addSchema(schema);
-    TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
-    addTableConfig(tableConfig);
-
-    waitForAllDocsLoaded(600_000L);
-  }
+  private final FieldConfig.CompressionCodec _selectedCompressionCodec =
+      RANDOM.nextBoolean() ? FieldConfig.CompressionCodec.CLP : 
FieldConfig.CompressionCodec.CLPV2;
 
-  @Nullable
   @Override
-  protected List<String> getInvertedIndexColumns() {
-    return null;
+  public String getTableName() {
+    return "CLPEncodingRealtimeTest";
   }
 
-  @Nullable
   @Override
-  protected List<String> getRangeIndexColumns() {
-    return null;
+  public boolean isRealtimeTable() {
+    return true;
   }
 
-  @Nullable
   @Override
-  protected List<String> getBloomFilterColumns() {
-    return null;
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
+        .addSingleValueDimension("logLine", FieldSpec.DataType.STRING)
+        .addDateTimeField("timestampInEpoch", FieldSpec.DataType.LONG, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
   }
 
-  @Nullable
   @Override
-  protected String getSortedColumn() {
-    return null;
+  public List<File> createAvroFiles()
+      throws Exception {
+    return unpackTarData("clpEncodingITData.tar.gz", _tempDir);
   }
 
   @Override
-  protected List<String> getNoDictionaryColumns() {
-    return Collections.singletonList("logLine");
-  }
-
-  @Test
-  public void testValues()
-      throws Exception {
-    Assert.assertEquals(getPinotConnection().execute(
-            "SELECT count(*) FROM " + getTableName() + " WHERE 
REGEXP_LIKE(logLine, '.*executor.*')").getResultSet(0)
-        .getLong(0), 53);
+  public String getTimeColumnName() {
+    return "timestampInEpoch";
   }
 
-  @AfterClass
-  public void tearDown()
-      throws Exception {
-    try {
-      dropRealtimeTable(getTableName());
-      stopServer();
-      stopBroker();
-      stopController();
-      stopKafka();
-      stopZk();
-    } finally {
-      FileUtils.deleteQuietly(_tempDir);
-    }
+  @Override
+  protected long getCountStarResult() {
+    return 100;
   }
 
+  @Override
   protected int getRealtimeSegmentFlushSize() {
     return 30;
   }
 
+  @Nullable
   @Override
-  protected long getCountStarResult() {
-    return 100;
+  protected List<String> getInvertedIndexColumns() {
+    return null;
   }
 
+  @Nullable
   @Override
-  protected String getTableName() {
-    return "clpEncodingIT";
+  protected List<String> getRangeIndexColumns() {
+    return null;
   }
 
+  @Nullable
   @Override
-  protected String getAvroTarFileName() {
-    return "clpEncodingITData.tar.gz";
+  protected List<String> getBloomFilterColumns() {
+    return null;
   }
 
+  @Nullable
   @Override
-  protected String getSchemaFileName() {
-    return "clpEncodingRealtimeIntegrationTestSchema.schema";
+  protected String getSortedColumn() {
+    return null;
   }
 
   @Override
-  protected String getTimeColumnName() {
-    return "timestampInEpoch";
+  protected List<String> getNoDictionaryColumns() {
+    return Collections.singletonList("logLine");
   }
 
   @Override
@@ -153,7 +111,6 @@ public class CLPEncodingRealtimeIntegrationTest extends 
BaseClusterIntegrationTe
     List<FieldConfig> fieldConfigs = new ArrayList<>();
     fieldConfigs.add(new 
FieldConfig.Builder("logLine").withEncodingType(FieldConfig.EncodingType.RAW)
         .withCompressionCodec(_selectedCompressionCodec).build());
-
     return fieldConfigs;
   }
 
@@ -164,7 +121,14 @@ public class CLPEncodingRealtimeIntegrationTest extends 
BaseClusterIntegrationTe
 
     IngestionConfig ingestionConfig = new IngestionConfig();
     ingestionConfig.setTransformConfigs(transforms);
-
     return ingestionConfig;
   }
+
+  @Test
+  public void testValues()
+      throws Exception {
+    Assert.assertEquals(getPinotConnection().execute(
+            "SELECT count(*) FROM " + getTableName() + " WHERE 
REGEXP_LIKE(logLine, '.*executor.*')").getResultSet(0)
+        .getLong(0), 53);
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
similarity index 91%
rename from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
rename to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
index 54efd3ccbd5..8d3c8d2672e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import java.io.File;
 import java.io.IOException;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -30,23 +29,20 @@ import java.util.Properties;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl;
 
 
-public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet {
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class GroupByOptionsTest extends CustomDataQueryClusterIntegrationTest {
 
   static final int FILES_NO = 4;
   static final int RECORDS_NO = 20;
@@ -55,42 +51,32 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   static final String RESULT_TABLE = "resultTable";
   static final int SERVERS_NO = 2;
 
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
-    startZk();
-    startController();
-    startServers(SERVERS_NO);
-    startBroker();
+  @Override
+  public String getTableName() {
+    return "GroupByOptionsTest";
+  }
 
-    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
         .addSingleValueDimension(I_COL, FieldSpec.DataType.INT)
         .addSingleValueDimension(J_COL, FieldSpec.DataType.LONG)
         .build();
-    addSchema(schema);
-    TableConfig tableConfig = createOfflineTableConfig();
-    addTableConfig(tableConfig);
-
-    List<File> avroFiles = createAvroFile(_tempDir);
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
-    uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
-
-    // Wait for all documents loaded
-    TestUtils.waitForCondition(() -> 
getCurrentCountStarResult(DEFAULT_TABLE_NAME) == FILES_NO * RECORDS_NO, 100L,
-        60_000,
-        "Failed to load  documents", true, Duration.ofMillis(60_000 / 10));
-
-    setUseMultiStageQueryEngine(true);
+  }
 
-    Map<String, List<String>> map = 
getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE);
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    return createAvroFile(_tempDir);
+  }
 
-    // make sure segments are split between multiple servers
-    Assert.assertEquals(map.size(), SERVERS_NO);
+  @Override
+  protected long getCountStarResult() {
+    return FILES_NO * RECORDS_NO;
   }
 
-  protected TableConfig createOfflineTableConfig() {
+  @Override
+  public TableConfig createOfflineTableConfig() {
     return new TableConfigBuilder(TableType.OFFLINE)
         .setTableName(getTableName())
         .setNumReplicas(getNumReplicas())
@@ -130,6 +116,12 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void 
testOrderByKeysIsNotPushedToFinalAggregationWhenGroupTrimHintIsDisabled()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
+    Map<String, List<String>> map = 
getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE);
+    // make sure segments are split between multiple servers
+    Assert.assertEquals(map.size(), SERVERS_NO);
+
     String trimDisabledPlan = "Execution Plan\n"
         + "LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC], 
offset=[0], fetch=[1])\n"
         + "  PinotLogicalSortExchange(distribution=[hash], collation=[[0, 1 
DESC]], isSortOnSender=[false], "
@@ -137,7 +129,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
         + "    LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC], 
fetch=[1])\n"
         + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL])\n"
         + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-        + "          LeafStageCombineOperator(table=[mytable])\n"
+        + "          LeafStageCombineOperator(table=[" + getTableName() + 
"])\n"
         + "            StreamingInstanceResponse\n"
         + "              CombineGroupBy\n"
         + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
@@ -171,6 +163,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void 
testOrderByKeysIsPushedToFinalAggregationStageWithoutGroupTrimSize()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     // is_enable_group_trim enables V1-style trimming in leaf nodes,
     // with numGroupsLimit and minSegmentGroupTrimSize,
     // while group_trim_size - in final aggregation node
@@ -197,7 +191,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
             + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[0, 1 "
             + "DESC]], limit=[1])\n"
             + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "          LeafStageCombineOperator(table=[" + getTableName() + 
"])\n"
             + "            StreamingInstanceResponse\n"
             + "              CombineGroupBy\n"
             + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
@@ -209,6 +203,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testOrderByKeysIsPushedToFinalAggregationStageWithGroupTrimSize()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     // is_enable_group_trim enables V1-style trimming in leaf nodes, with 
numGroupsLimit and minSegmentGroupTrimSize,
     // while group_trim_size - in final aggregation node .
     // Same as above, to stabilize result here, we override global 
numGroupsLimit option with num_groups_limit hint.
@@ -231,7 +227,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
             + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[0, 1 "
             + "DESC]], limit=[1])\n"
             + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "          LeafStageCombineOperator(table=[" + getTableName() + 
"])\n"
             + "            StreamingInstanceResponse\n"
             + "              CombineGroupBy\n"
             + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
@@ -243,6 +239,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testOrderByKeysIsPushedToFinalAggregationStage()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     assertResultAndPlan(
         // group_trim_size should sort and limit v2 aggregate output if order 
by and limit is propagated
         " ",
@@ -263,7 +261,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
             + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[0, "
             + "1]], limit=[3])\n"
             + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "          LeafStageCombineOperator(table=[" + getTableName() + 
"])\n"
             + "            StreamingInstanceResponse\n"
             + "              CombineGroupBy\n"
             + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
@@ -275,6 +273,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testHavingOnKeysAndOrderByKeysIsPushedToFinalAggregationStage()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     assertResultAndPlan(
         // group_trim_size should sort and limit v2 aggregate output if order 
by and limit is propagated
         " ",
@@ -296,7 +296,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
             + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[0, "
             + "1]], limit=[3])\n"
             + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "          LeafStageCombineOperator(table=[" + getTableName() + 
"])\n"
             + "            StreamingInstanceResponse\n"
             + "              CombineGroupBy\n"
             + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
@@ -308,6 +308,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testGroupByKeysWithOffsetIsPushedToFinalAggregationStage()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     // if offset is set, leaf should return more results to intermediate stage
     assertResultAndPlan(
         "",
@@ -329,7 +331,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
             + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[0, "
             + "1]], limit=[4])\n"
             + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "          LeafStageCombineOperator(table=[" + getTableName() + 
"])\n"
             + "            StreamingInstanceResponse\n"
             + "              CombineGroupBy\n"
             + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
@@ -342,6 +344,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testOrderByByKeysAndValuesIsPushedToFinalAggregationStage()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     // group_trim_size should sort and limit v2 aggregate output if order by 
and limit is propagated
     assertResultAndPlan(
         " ",
@@ -364,7 +368,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
             + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[0 "
             + "DESC, 1 DESC, 2 DESC]], limit=[3])\n"
             + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "          LeafStageCombineOperator(table=[" + getTableName() + 
"])\n"
             + "            StreamingInstanceResponse\n"
             + "              CombineGroupBy\n"
             + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
@@ -377,6 +381,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testOrderByKeyValueExpressionIsNotPushedToFinalAggregateStage()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     // Order by both expression based on keys and aggregate values.
     // Expression & limit are not available until after aggregation so they 
can't be pushed down.
     // Because of that, group_trim_size is not applied.
@@ -402,7 +408,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
             + "      LogicalProject(i=[$0], j=[$1], cnt=[$2], EXPR$3=[*(*($0, 
$1), $2)])\n"
             + "        PinotLogicalAggregate(group=[{0, 1}], 
agg#0=[COUNT($2)], aggType=[FINAL])\n"
             + "          PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-            + "            LeafStageCombineOperator(table=[mytable])\n"
+            + "            LeafStageCombineOperator(table=[" + getTableName() 
+ "])\n"
             + "              StreamingInstanceResponse\n"
             + "                CombineGroupBy\n"
             + "                  GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
@@ -415,6 +421,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testForGroupByOverJoinOrderByKeyIsPushedToAggregationLeafStage()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     // query uses V2 aggregate operator for both leaf and final stages because 
of join
     assertResultAndPlan(
         " ",
@@ -442,18 +450,18 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
             + "1]], limit=[5])\n"
             + "            LogicalJoin(condition=[true], joinType=[inner])\n"
             + "              PinotLogicalExchange(distribution=[random])\n"
-            + "                LeafStageCombineOperator(table=[mytable])\n"
+            + "                LeafStageCombineOperator(table=[" + 
getTableName() + "])\n"
             + "                  StreamingInstanceResponse\n"
             + "                    StreamingCombineSelect\n"
-            + "                      SelectStreaming(table=[mytable], 
totalDocs=[80])\n"
+            + "                      SelectStreaming(table=[" + getTableName() 
+ "], totalDocs=[80])\n"
             + "                        Project(columns=[[i, j]])\n"
             + "                          DocIdSet(maxDocs=[40000])\n"
             + "                            
FilterMatchEntireSegment(numDocs=[80])\n"
             + "              PinotLogicalExchange(distribution=[broadcast])\n"
-            + "                LeafStageCombineOperator(table=[mytable])\n"
+            + "                LeafStageCombineOperator(table=[" + 
getTableName() + "])\n"
             + "                  StreamingInstanceResponse\n"
             + "                    StreamingCombineSelect\n"
-            + "                      SelectStreaming(table=[mytable], 
totalDocs=[80])\n"
+            + "                      SelectStreaming(table=[" + getTableName() 
+ "], totalDocs=[80])\n"
             + "                        Transform(expressions=[['0']])\n"
             + "                          Project(columns=[[]])\n"
             + "                            DocIdSet(maxDocs=[40000])\n"
@@ -461,7 +469,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
     );
   }
 
-  public void assertResultAndPlan(String option, String query, String 
expectedResult, String expectedPlan)
+  private void assertResultAndPlan(String option, String query, String 
expectedResult, String expectedPlan)
       throws Exception {
     String sql = option
         //disable timeout in debug
@@ -478,6 +486,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void 
testExceptionIsThrownWhenErrorOnNumGroupsLimitHintIsSetAndLimitIsReachedV1()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     String query = " select /*+  
aggOptions(num_groups_limit='1',error_on_num_groups_limit='true') */"
         + " i, j, count(*) as cnt "
         + " from " + getTableName()
@@ -490,6 +500,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void 
testExceptionIsThrownWhenErrorOnNumGroupsLimitHintIsSetAndLimitIsReachedV2()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     String query = " set numGroupsLimit=1;"
         + " select /*+  aggOptions(error_on_num_groups_limit='true') */"
         + " i, j, count(*) as cnt "
@@ -503,6 +515,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void 
testExceptionIsThrownWhenErrorOnNumGroupsLimitOptionIsSetAndLimitIsReachedV1()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     String query = " set errorOnNumGroupsLimit=true; set numGroupsLimit=1;"
         + " select i, j, count(*) as cnt "
         + " from " + getTableName()
@@ -515,6 +529,8 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void 
testExceptionIsThrownWhenErrorOnNumGroupsLimitOptionIsSetAndLimitIsReachedV2()
       throws Exception {
+    setUseMultiStageQueryEngine(true);
+
     String query = " set errorOnNumGroupsLimit=true; "
         + "select /*+  aggOptions(num_groups_limit='1') */ i, j, count(*) as 
cnt "
         + " from " + getTableName()
@@ -571,7 +587,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
     return toString(node);
   }
 
-  static String toExplainStr(JsonNode mainNode, boolean isMSQE) {
+  public static String toExplainStr(JsonNode mainNode, boolean isMSQE) {
     if (mainNode == null) {
       return "null";
     }
@@ -582,7 +598,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
     return toExplainString(node, isMSQE);
   }
 
-  static String toExplainStr(JsonNode mainNode) {
+  public static String toExplainStr(JsonNode mainNode) {
     return toExplainStr(mainNode, false);
   }
 
@@ -643,17 +659,4 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
       return result.toString();
     }
   }
-
-  @AfterClass
-  public void tearDown()
-      throws Exception {
-    dropOfflineTable(DEFAULT_TABLE_NAME);
-
-    stopServer();
-    stopBroker();
-    stopController();
-    stopZk();
-
-    FileUtils.deleteDirectory(_tempDir);
-  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByTrimmingTest.java
similarity index 86%
rename from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
rename to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByTrimmingTest.java
index 689b7de0a76..00993a75a6d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByTrimmingTest.java
@@ -16,18 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
 
 import java.io.File;
 import java.io.IOException;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.commons.io.FileUtils;
 import org.apache.pinot.client.Connection;
 import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -35,13 +33,10 @@ import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static 
org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toExplainStr;
-import static 
org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toResultStr;
+import static 
org.apache.pinot.integration.tests.custom.GroupByOptionsTest.toExplainStr;
+import static 
org.apache.pinot.integration.tests.custom.GroupByOptionsTest.toResultStr;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
@@ -52,7 +47,8 @@ import static org.testng.Assert.assertTrue;
 // MSQE - segment, inter-segment and intermediate levels
 // Note: MSQE doesn't push collations depending on group by result into 
aggregation nodes
 // so e.g. ORDER BY i*j doesn't trigger trimming even when hints are set
-public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSet {
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class GroupByTrimmingTest extends CustomDataQueryClusterIntegrationTest 
{
 
   static final int FILES_NO = 4;
   static final int RECORDS_NO = 1000;
@@ -60,42 +56,32 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
   static final String J_COL = "j";
   static final int SERVERS_NO = 2;
 
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
-    startZk();
-    startController();
-    startServers(SERVERS_NO);
-    startBroker();
+  @Override
+  public String getTableName() {
+    return "GroupByTrimmingTest";
+  }
 
-    Schema schema = new 
Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
+  @Override
+  public Schema createSchema() {
+    return new Schema.SchemaBuilder().setSchemaName(getTableName())
         .addSingleValueDimension(I_COL, FieldSpec.DataType.INT)
         .addSingleValueDimension(J_COL, FieldSpec.DataType.LONG)
         .build();
-    addSchema(schema);
-    TableConfig tableConfig = createOfflineTableConfig();
-    addTableConfig(tableConfig);
-
-    List<File> avroFiles = createAvroFile(_tempDir);
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
-    uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
-
-    // Wait for all documents loaded
-    TestUtils.waitForCondition(() -> 
getCurrentCountStarResult(DEFAULT_TABLE_NAME) == FILES_NO * RECORDS_NO, 100L,
-        60_000,
-        "Failed to load  documents", true, Duration.ofMillis(60_000 / 10));
-
-    setUseMultiStageQueryEngine(true);
+  }
 
-    Map<String, List<String>> map = 
getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE);
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    return createAvroFile(_tempDir);
+  }
 
-    // make sure segments are split between multiple servers
-    assertEquals(map.size(), SERVERS_NO);
+  @Override
+  protected long getCountStarResult() {
+    return FILES_NO * RECORDS_NO;
   }
 
-  protected TableConfig createOfflineTableConfig() {
+  @Override
+  public TableConfig createOfflineTableConfig() {
     return new TableConfigBuilder(TableType.OFFLINE)
         .setTableName(getTableName())
         .setNumReplicas(getNumReplicas())
@@ -138,12 +124,17 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
       throws Exception {
     setUseMultiStageQueryEngine(true);
 
+    Map<String, List<String>> map = 
getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE);
+    // make sure segments are split between multiple servers
+    assertEquals(map.size(), SERVERS_NO);
+
     Connection conn = getPinotConnection();
-    assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable 
GROUP BY i, j ORDER BY i*j DESC LIMIT 5"));
+    assertTrimFlagNotSet(conn.execute(
+        "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER 
BY i*j DESC LIMIT 5"));
 
     String options = "SET minSegmentGroupTrimSize=5; ";
     String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, 
j, COUNT(*) "
-        + "FROM mytable GROUP BY i, j ORDER BY i*j DESC LIMIT 5 ";
+        + "FROM " + getTableName() + " GROUP BY i, j ORDER BY i*j DESC LIMIT 5 
";
 
     ResultSetGroup result = conn.execute(options + query);
     assertTrimFlagNotSet(result);
@@ -158,7 +149,7 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
             // <-- order by value is computed here, so trimming in upstream 
stages is not possible
             + "        PinotLogicalAggregate(group=[{0, 1}], 
agg#0=[COUNT($2)], aggType=[FINAL])\n"
             + "          PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-            + "            LeafStageCombineOperator(table=[mytable])\n"
+            + "            LeafStageCombineOperator(table=[" + getTableName() 
+ "])\n"
             + "              StreamingInstanceResponse\n"
             + "                CombineGroupBy\n"
             + "                  GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
@@ -173,11 +164,12 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
     setUseMultiStageQueryEngine(true);
 
     Connection conn = getPinotConnection();
-    assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable 
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+    assertTrimFlagNotSet(conn.execute(
+        "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER 
BY j DESC LIMIT 5"));
 
     String options = "SET minSegmentGroupTrimSize=5; ";
     String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, 
j, COUNT(*) "
-        + "FROM mytable GROUP BY i, j ORDER BY j DESC LIMIT 5 ";
+        + "FROM " + getTableName() + " GROUP BY i, j ORDER BY j DESC LIMIT 5 ";
 
     ResultSetGroup result = conn.execute(options + query);
     assertTrimFlagSet(result);
@@ -199,7 +191,7 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
             + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[1 DESC]],"
             + " limit=[5])\n"
             + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "          LeafStageCombineOperator(table=[" + getTableName() + 
"])\n"
             + "            StreamingInstanceResponse\n"
             + "              CombineGroupBy\n"
             + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n" // <-- trimming happens here
@@ -215,11 +207,12 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(
-        conn.execute("SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER 
BY COUNT(*) DESC LIMIT 5"));
+        conn.execute(
+            "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j 
ORDER BY COUNT(*) DESC LIMIT 5"));
 
     String options = "SET minSegmentGroupTrimSize=5; ";
     String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, 
j, COUNT(*) "
-        + "FROM mytable GROUP BY i, j ORDER BY count(*) DESC LIMIT 5 ";
+        + "FROM " + getTableName() + " GROUP BY i, j ORDER BY count(*) DESC 
LIMIT 5 ";
 
     ResultSetGroup result = conn.execute(options + query);
     assertTrimFlagSet(result);
@@ -244,7 +237,7 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
             + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[2 DESC]],"
             + " limit=[5])\n"
             + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "          LeafStageCombineOperator(table=[" + getTableName() + 
"])\n"
             + "            StreamingInstanceResponse\n"
             + "              CombineGroupBy\n"
             + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n" //<-- trimming happens here
@@ -259,11 +252,12 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
     setUseMultiStageQueryEngine(true);
 
     Connection conn = getPinotConnection();
-    assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable 
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+    assertTrimFlagNotSet(conn.execute(
+        "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER 
BY j DESC LIMIT 5"));
 
     String options = "SET minServerGroupTrimSize = 5; SET groupTrimThreshold = 
100; ";
     String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i, 
j, COUNT(*) "
-        + "FROM mytable "
+        + "FROM " + getTableName() + " "
         + "GROUP BY i, j "
         + "ORDER BY j DESC "
         + "LIMIT 5 ";
@@ -287,7 +281,7 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
             + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[1 DESC]],"
             + " limit=[5])\n"
             + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n"
-            + "          LeafStageCombineOperator(table=[mytable])\n"
+            + "          LeafStageCombineOperator(table=[" + getTableName() + 
"])\n"
             + "            StreamingInstanceResponse\n"
             + "              CombineGroupBy\n" // <-- trimming happens here
             + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
@@ -302,12 +296,13 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
     setUseMultiStageQueryEngine(true);
 
     Connection conn = getPinotConnection();
-    assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable 
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+    assertTrimFlagNotSet(conn.execute(
+        "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER 
BY j DESC LIMIT 5"));
 
     // This case is tricky because intermediate results are hash-split among 
servers so one gets 50 rows on average.
     // That's the reason both limit and trim size needs to be so small.
     String query = "SELECT /*+ 
aggOptions(is_enable_group_trim='true',mse_min_group_trim_size='5') */ i, j, 
COUNT(*) "
-        + "FROM mytable "
+        + "FROM " + getTableName() + " "
         + "GROUP BY i, j "
         + "ORDER BY j DESC "
         + "LIMIT 5 ";
@@ -331,7 +326,7 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
             + "      PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], 
aggType=[FINAL], collations=[[1 DESC]],"
             + " limit=[5])\n" // receives 50-row-big blocks, trimming kicks in 
only if limit is lower
             + "        PinotLogicalExchange(distribution=[hash[0, 1]])\n" // 
splits blocks via hash distribution
-            + "          LeafStageCombineOperator(table=[mytable])\n" // no 
trimming happens 'below'
+            + "          LeafStageCombineOperator(table=[" + getTableName() + 
"])\n" // no trimming happens 'below'
             + "            StreamingInstanceResponse\n"
             + "              CombineGroupBy\n"
             + "                GroupBy(groupKeys=[[i, j]], 
aggregations=[[count(*)]])\n"
@@ -345,7 +340,9 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
   public void 
testSSQEFilteredGroupsTrimmedAtSegmentLevelWithOrderGroupByKeysDerivedFunctionIsNotSafe()
       throws Exception {
     setUseMultiStageQueryEngine(false);
-    String query = "SELECT i, j, SUM(i) FILTER (WHERE i > 0) FROM mytable 
GROUP BY i, j ORDER BY i + j DESC LIMIT 5";
+    String query =
+        "SELECT i, j, SUM(i) FILTER (WHERE i > 0) FROM " + getTableName()
+            + " GROUP BY i, j ORDER BY i + j DESC LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -378,7 +375,8 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
   public void 
testSSQEGroupsTrimmedAtSegmentLevelWithOrderGroupByKeysDerivedFunctionIsNotSafe()
       throws Exception {
     setUseMultiStageQueryEngine(false);
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
i + j DESC LIMIT 5";
+    String query =
+        "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER 
BY i + j DESC LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -408,7 +406,7 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
   public void 
testSSQEGroupsTrimmedAtSegmentLevelWithOrderBySomeGroupByKeysIsNotSafe()
       throws Exception {
     setUseMultiStageQueryEngine(false);
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
j DESC LIMIT 5";
+    String query = "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY 
i, j ORDER BY j DESC LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -445,7 +443,8 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
 
     // trimming is safe on rows ordered by all group by keys (regardless of 
key order, direction or duplications)
     // but not when HAVING clause is present
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i 
> 50  ORDER BY i ASC, j ASC";
+    String query =
+        "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j 
HAVING i > 50  ORDER BY i ASC, j ASC";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -473,7 +472,8 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
     setUseMultiStageQueryEngine(false);
 
     // trimming is safe on rows ordered by all group by keys (regardless of 
key order, direction or duplications)
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
j ASC, i DESC, j ASC LIMIT 5";
+    String query =
+        "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER 
BY j ASC, i DESC, j ASC LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -505,7 +505,9 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
     setUseMultiStageQueryEngine(false);
 
     // trimming is safe on rows ordered by all group by keys (regardless of 
key order, direction or duplications)
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, i, j ORDER 
BY j ASC, i DESC, j ASC LIMIT 5";
+    String query =
+        "SELECT i, j, COUNT(*) FROM " + getTableName()
+            + " GROUP BY i, i, j ORDER BY j ASC, i DESC, j ASC LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -538,7 +540,8 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
     setUseMultiStageQueryEngine(false);
 
     // trimming is safe on rows ordered by all group by keys (regardless of 
key order or direction)
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
count(*)*j ASC LIMIT 5";
+    String query =
+        "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER 
BY count(*)*j ASC LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -570,7 +573,7 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
   public void 
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnSomeGroupByKeysIsNotSafe()
       throws Exception {
     setUseMultiStageQueryEngine(false);
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
i DESC  LIMIT 5";
+    String query = "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY 
i, j ORDER BY i DESC  LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -595,7 +598,7 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
       throws Exception {
     // for SSQE server level == inter-segment level
     setUseMultiStageQueryEngine(false);
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
i, j  LIMIT 5";
+    String query = "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY 
i, j ORDER BY i, j  LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -627,7 +630,8 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
       throws Exception {
     // for SSQE server level == inter-segment level
     setUseMultiStageQueryEngine(false);
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
count(*)*j DESC LIMIT 5";
+    String query =
+        "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER 
BY count(*)*j DESC LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -636,15 +640,6 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
     ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET 
groupTrimThreshold = 100; " + query);
     assertTrimFlagSet(result);
 
-    // Result, though unstable due to concurrent operations on IndexedTable, 
is similar to the following
-    // (which is not correct):
-    //i[INT],j[LONG],count(*)[LONG]
-    //98,\t998,\t4
-    //94,\t994,\t4
-    //90,\t990,\t4
-    //86,\t986,\t4
-    //79,\t979,\t4
-
     assertEquals(toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false),
         "BROKER_REDUCE(sort:[times(count(*),j) 
DESC],limit:5,postAggregations:times(count(*),j)),\t1,\t0\n"
             + "COMBINE_GROUP_BY,\t2,\t1\n" //<-- trimming happens here
@@ -659,7 +654,9 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
   public void 
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnAllGroupByKeysAndHavingIsNotSafe()
       throws Exception {
     setUseMultiStageQueryEngine(false);
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i 
> 50  ORDER BY i ASC, j ASC LIMIT 5";
+    String query =
+        "SELECT i, j, COUNT(*) FROM " + getTableName()
+            + " GROUP BY i, j HAVING i > 50  ORDER BY i ASC, j ASC LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -688,7 +685,7 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
   public void testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByKeysIsSafe()
       throws Exception {
     setUseMultiStageQueryEngine(false);
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
i, j LIMIT 5";
+    String query = "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY 
i, j ORDER BY i, j LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -719,7 +716,7 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
   public void 
testSSQEGroupsTrimmedAtBrokerLevelOrderedBySomeGroupByKeysIsNotSafe()
       throws Exception {
     setUseMultiStageQueryEngine(false);
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
j DESC LIMIT 5";
+    String query = "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY 
i, j ORDER BY j DESC LIMIT 5";
 
     Connection conn = getPinotConnection();
     ResultSetGroup result1 = conn.execute(query);
@@ -751,7 +748,9 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
   public void 
testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByKeysAndHavingIsNotSafe()
       throws Exception {
     setUseMultiStageQueryEngine(false);
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i 
> 50  ORDER BY i ASC, j ASC LIMIT 5";
+    String query =
+        "SELECT i, j, COUNT(*) FROM " + getTableName()
+            + " GROUP BY i, j HAVING i > 50  ORDER BY i ASC, j ASC LIMIT 5";
 
     Connection conn = getPinotConnection();
     ResultSetGroup result1 = conn.execute(query);
@@ -779,7 +778,8 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
   public void 
testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByAggregateIsNotSafe()
       throws Exception {
     setUseMultiStageQueryEngine(false);
-    String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY 
count(*)*j DESC LIMIT 5";
+    String query =
+        "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER 
BY count(*)*j DESC LIMIT 5";
 
     Connection conn = getPinotConnection();
     assertTrimFlagNotSet(conn.execute(query));
@@ -788,14 +788,6 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
     ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET 
groupTrimThreshold = 50; " + query);
     assertTrimFlagSet(result);
 
-    // result is similar to the following, but unstable:
-    //i[INT],j[LONG],count(*)[LONG]
-    //99,999,4
-    //98,998,4
-    //97,997,4
-    //96,996,4
-    //82,982,4
-
     assertEquals(toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false),
         "BROKER_REDUCE(sort:[times(count(*),j) DESC],limit:5," //<-- trimming 
happens here
             + "postAggregations:times(count(*),j)),\t1,\t0\n"
@@ -814,17 +806,4 @@ public class GroupByTrimmingIntegrationTest extends 
BaseClusterIntegrationTestSe
   private static void assertTrimFlagSet(ResultSetGroup result) {
     
assertTrue(result.getBrokerResponse().getExecutionStats().isGroupsTrimmed());
   }
-
-  @AfterClass
-  public void tearDown()
-      throws Exception {
-    dropOfflineTable(DEFAULT_TABLE_NAME);
-
-    stopServer();
-    stopBroker();
-    stopController();
-    stopZk();
-
-    FileUtils.deleteDirectory(_tempDir);
-  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
index ed1f439401d..d8672c44146 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
@@ -52,7 +52,7 @@ import org.testng.annotations.Test;
 import static org.apache.avro.Schema.create;
 import static org.apache.avro.Schema.createArray;
 import static org.apache.avro.Schema.createUnion;
-import static 
org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toResultStr;
+import static 
org.apache.pinot.integration.tests.custom.GroupByOptionsTest.toResultStr;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RowExpressionIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/RowExpressionTest.java
similarity index 72%
rename from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RowExpressionIntegrationTest.java
rename to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/RowExpressionTest.java
index 47c3285c0bb..29972fb64fe 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RowExpressionIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/RowExpressionTest.java
@@ -16,97 +16,57 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
-public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet {
-  private static final String SCHEMA_FILE_NAME = 
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
 
-  @Override
-  protected String getSchemaFileName() {
-    return SCHEMA_FILE_NAME;
-  }
-
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
-    // Start the Pinot cluster
-    startZk();
-    startController();
-    startBroker();
-    startServer();
-    setupTenants();
-
-    // Create and upload the schema and table config
-    Schema schema = createSchema();
-    addSchema(schema);
-    TableConfig tableConfig = createOfflineTableConfig();
-    addTableConfig(tableConfig);
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class RowExpressionTest extends CustomDataQueryClusterIntegrationTest {
+  private static final String TABLE_NAME = "RowExpressionTest";
+  private static final String SCHEMA_FILE_NAME =
+      "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
 
-    // Unpack the Avro files
-    List<File> avroFiles = unpackAvroData(_tempDir);
-
-    // Create and upload segments
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
-    uploadSegments(getTableName(), _tarDir);
-
-    // Set up the H2 connection
-    setUpH2Connection(avroFiles);
-
-    // Initialize the query generator
-    setUpQueryGenerator(avroFiles);
-
-    // Wait for all documents loaded
-    waitForAllDocsLoaded(600_000L);
-
-    // Use multi-stage query engine for all tests
-    setUseMultiStageQueryEngine(true);
+  @Override
+  public String getTableName() {
+    return TABLE_NAME;
   }
 
-  @BeforeMethod
   @Override
-  public void resetMultiStage() {
-    setUseMultiStageQueryEngine(true);
+  public Schema createSchema() {
+    try {
+      Schema schema = createSchema(SCHEMA_FILE_NAME);
+      schema.setSchemaName(getTableName());
+      return schema;
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to load schema: " + SCHEMA_FILE_NAME, 
e);
+    }
   }
 
-  protected void setupTenants()
+  @Override
+  public List<File> createAvroFiles()
       throws Exception {
-    // Use default tenant setup
+    return unpackAvroData(_tempDir);
   }
 
-  @AfterClass
-  public void tearDown()
-      throws Exception {
-    dropOfflineTable(DEFAULT_TABLE_NAME);
-
-    stopServer();
-    stopBroker();
-    stopController();
-    stopZk();
-
-    FileUtils.deleteDirectory(_tempDir);
+  @BeforeMethod
+  public void resetMultiStage() {
+    setUseMultiStageQueryEngine(true);
   }
 
   @Test
   public void testRowEqualityTwoFields()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, ArrDelay) = 
(201, 10)";
+    String query = "SELECT COUNT(*) FROM " + getTableName() + " WHERE 
(AirTime, ArrDelay) = (201, 10)";
     JsonNode result = postQuery(query);
     assertNoError(result);
     assertTrue(result.get("numRowsResultSet").asInt() >= 0);
@@ -115,7 +75,8 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowEqualityThreeFields()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, ArrDelay, 
DepDelay) = (201, 10, 5)";
+    String query =
+        "SELECT COUNT(*) FROM " + getTableName() + " WHERE (AirTime, ArrDelay, 
DepDelay) = (201, 10, 5)";
     JsonNode result = postQuery(query);
     assertNoError(result);
     assertTrue(result.get("numRowsResultSet").asInt() >= 0);
@@ -124,7 +85,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowNotEquals()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, ArrDelay) <> 
(0, 0)";
+    String query = "SELECT COUNT(*) FROM " + getTableName() + " WHERE 
(AirTime, ArrDelay) <> (0, 0)";
     JsonNode result = postQuery(query);
     assertNoError(result);
     assertTrue(result.get("numRowsResultSet").asInt() > 0);
@@ -133,7 +94,8 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowGreaterThan()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, 
ActualElapsedTime) > (200, 230)";
+    String query =
+        "SELECT COUNT(*) FROM " + getTableName() + " WHERE (AirTime, 
ActualElapsedTime) > (200, 230)";
     JsonNode result = postQuery(query);
     assertNoError(result);
     long count = result.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -143,7 +105,8 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowGreaterThanOrEqual()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, 
ActualElapsedTime) >= (200, 230)";
+    String query =
+        "SELECT COUNT(*) FROM " + getTableName() + " WHERE (AirTime, 
ActualElapsedTime) >= (200, 230)";
     JsonNode result = postQuery(query);
     assertNoError(result);
     long count = result.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -153,7 +116,8 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowLessThan()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, 
ActualElapsedTime) < (100, 120)";
+    String query =
+        "SELECT COUNT(*) FROM " + getTableName() + " WHERE (AirTime, 
ActualElapsedTime) < (100, 120)";
     JsonNode result = postQuery(query);
     assertNoError(result);
     long count = result.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -163,7 +127,8 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowLessThanOrEqual()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, 
ActualElapsedTime) <= (100, 120)";
+    String query =
+        "SELECT COUNT(*) FROM " + getTableName() + " WHERE (AirTime, 
ActualElapsedTime) <= (100, 120)";
     JsonNode result = postQuery(query);
     assertNoError(result);
     long count = result.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -173,8 +138,8 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowWithFourFields()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable "
-        + "WHERE (AirTime, ArrDelay, DepDelay, Distance) > (200, 0, 0, 1000)";
+    String query = "SELECT COUNT(*) FROM " + getTableName()
+        + " WHERE (AirTime, ArrDelay, DepDelay, Distance) > (200, 0, 0, 1000)";
     JsonNode result = postQuery(query);
     assertNoError(result);
     assertTrue(result.get("numRowsResultSet").asInt() >= 0);
@@ -183,7 +148,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowWithMixedDataTypes()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE (AirlineID, Carrier) > 
(20000, 'AA')";
+    String query = "SELECT COUNT(*) FROM " + getTableName() + " WHERE 
(AirlineID, Carrier) > (20000, 'AA')";
     JsonNode result = postQuery(query);
     assertNoError(result);
     assertTrue(result.get("numRowsResultSet").asInt() >= 0);
@@ -193,7 +158,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   public void testKeysetPaginationUseCase()
       throws Exception {
     String query1 = "SELECT AirlineID, Carrier, AirTime "
-        + "FROM mytable "
+        + "FROM " + getTableName() + " "
         + "ORDER BY AirlineID, Carrier, AirTime "
         + "LIMIT 10";
     JsonNode result1 = postQuery(query1);
@@ -207,7 +172,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     String query2 = String.format(
         "SELECT AirlineID, Carrier, AirTime "
-        + "FROM mytable "
+        + "FROM " + getTableName() + " "
         + "WHERE (AirlineID, Carrier, AirTime) > (%d, '%s', %d) "
         + "ORDER BY AirlineID, Carrier, AirTime "
         + "LIMIT 10",
@@ -225,7 +190,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   public void testRowInComplexQuery()
       throws Exception {
     String query = "SELECT COUNT(*) FROM ("
-        + "  SELECT AirlineID, Carrier FROM mytable "
+        + "  SELECT AirlineID, Carrier FROM " + getTableName() + " "
         + "  WHERE (AirlineID, Carrier) > (20000, 'AA') "
         + "  ORDER BY AirlineID, Carrier LIMIT 100"
         + ") AS t";
@@ -238,7 +203,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   public void testRowWithCTE()
       throws Exception {
     String query = "WITH filtered AS ("
-        + "  SELECT AirlineID, Carrier, AirTime FROM mytable "
+        + "  SELECT AirlineID, Carrier, AirTime FROM " + getTableName() + " "
         + "  WHERE AirlineID > 19000"
         + ") "
         + "SELECT COUNT(*) FROM filtered "
@@ -251,7 +216,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testMultipleRowComparisons()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable "
+    String query = "SELECT COUNT(*) FROM " + getTableName() + " "
         + "WHERE (AirTime, ActualElapsedTime) > (100, 120) "
         + "AND (AirTime, ActualElapsedTime) < (500, 600)";
     JsonNode result = postQuery(query);
@@ -262,7 +227,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowComparisonWithLiterals()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable "
+    String query = "SELECT COUNT(*) FROM " + getTableName() + " "
         + "WHERE (201, 230) < (AirTime, ActualElapsedTime)";
     JsonNode result = postQuery(query);
     assertNoError(result);
@@ -273,7 +238,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   public void testExplainPlanWithRowExpression()
       throws Exception {
     String query = "EXPLAIN PLAN FOR "
-        + "SELECT * FROM mytable WHERE (AirTime, ActualElapsedTime) > (200, 
230) LIMIT 10";
+        + "SELECT * FROM " + getTableName() + " WHERE (AirTime, 
ActualElapsedTime) > (200, 230) LIMIT 10";
     JsonNode result = postQuery(query);
     assertNoError(result);
     String plan = result.get("resultTable").get("rows").get(0).get(1).asText();
@@ -283,7 +248,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowInSelectList()
       throws Exception {
-    String query = "SELECT (AirTime, ActualElapsedTime) FROM mytable LIMIT 10";
+    String query = "SELECT (AirTime, ActualElapsedTime) FROM " + 
getTableName() + " LIMIT 10";
     JsonNode result = postQuery(query);
     assertTrue(result.get("exceptions").size() > 0, "Expected validation 
error");
     String errorMessage = 
result.get("exceptions").get(0).get("message").asText();
@@ -294,7 +259,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowInGroupBy()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable GROUP BY (AirlineID, 
Carrier)";
+    String query = "SELECT COUNT(*) FROM " + getTableName() + " GROUP BY 
(AirlineID, Carrier)";
     JsonNode result = postQuery(query);
     assertTrue(result.get("exceptions").size() > 0, "Expected validation 
error");
     String errorMessage = 
result.get("exceptions").get(0).get("message").asText();
@@ -306,7 +271,8 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowInOrderBy()
       throws Exception {
-    String query = "SELECT AirlineID, Carrier FROM mytable ORDER BY 
(AirlineID, Carrier) LIMIT 10";
+    String query = "SELECT AirlineID, Carrier FROM " + getTableName()
+        + " ORDER BY (AirlineID, Carrier) LIMIT 10";
     JsonNode result = postQuery(query);
     assertTrue(result.get("exceptions").size() > 0, "Expected validation 
error");
     String errorMessage = 
result.get("exceptions").get(0).get("message").asText();
@@ -318,7 +284,8 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testSingleSidedRowComparison()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, 
ActualElapsedTime) > 200";
+    String query = "SELECT COUNT(*) FROM " + getTableName()
+        + " WHERE (AirTime, ActualElapsedTime) > 200";
     JsonNode result = postQuery(query);
     assertTrue(result.get("exceptions").size() > 0, "Expected validation 
error");
     String errorMessage = 
result.get("exceptions").get(0).get("message").asText();
@@ -329,7 +296,8 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testMismatchedRowSizes()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, 
ActualElapsedTime) > (200, 230, 250)";
+    String query = "SELECT COUNT(*) FROM " + getTableName()
+        + " WHERE (AirTime, ActualElapsedTime) > (200, 230, 250)";
     JsonNode result = postQuery(query);
     assertTrue(result.get("exceptions").size() > 0, "Expected validation 
error");
     String errorMessage = 
result.get("exceptions").get(0).get("message").asText();
@@ -340,7 +308,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testEmptyRowExpression()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE () > ()";
+    String query = "SELECT COUNT(*) FROM " + getTableName() + " WHERE () > ()";
     JsonNode result = postQuery(query);
     assertTrue(result.get("exceptions").size() > 0, "Expected validation 
error");
   }
@@ -348,7 +316,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowInFunctionCall()
       throws Exception {
-    String query = "SELECT SUM((AirTime, ActualElapsedTime)) FROM mytable";
+    String query = "SELECT SUM((AirTime, ActualElapsedTime)) FROM " + 
getTableName();
     JsonNode result = postQuery(query);
     assertTrue(result.get("exceptions").size() > 0, "Expected validation 
error");
     String errorMessage = 
result.get("exceptions").get(0).get("message").asText();
@@ -359,7 +327,8 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowWithNullComparison()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, ArrDelay) > 
(200, NULL)";
+    String query = "SELECT COUNT(*) FROM " + getTableName()
+        + " WHERE (AirTime, ArrDelay) > (200, NULL)";
     JsonNode result = postQuery(query);
     assertNoError(result);
   }
@@ -367,7 +336,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowComparisonSameValues()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable "
+    String query = "SELECT COUNT(*) FROM " + getTableName() + " "
         + "WHERE (AirTime, ActualElapsedTime) >= (AirTime, ActualElapsedTime)";
     JsonNode result = postQuery(query);
     assertNoError(result);
@@ -381,7 +350,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
       throws Exception {
     // Test ROW in subquery WHERE clause
     String query = "SELECT COUNT(*) FROM ("
-        + "  SELECT * FROM mytable WHERE (AirTime, ActualElapsedTime) > (200, 
230)"
+        + "  SELECT * FROM " + getTableName() + " WHERE (AirTime, 
ActualElapsedTime) > (200, 230)"
         + ") AS subq";
     JsonNode result = postQuery(query);
     assertNoError(result);
@@ -391,7 +360,7 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowComparisonWithCalculatedFields()
       throws Exception {
-    String query = "SELECT COUNT(*) FROM mytable "
+    String query = "SELECT COUNT(*) FROM " + getTableName() + " "
         + "WHERE (AirTime * 2, ActualElapsedTime + 10) > (400, 240)";
     JsonNode result = postQuery(query);
     assertNoError(result);
@@ -401,12 +370,13 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowGreaterThanOrEqualVsExpanded()
       throws Exception {
-    String rowQuery = "SELECT COUNT(*) FROM mytable WHERE (AirTime, 
ActualElapsedTime) >= (200, 230)";
+    String rowQuery = "SELECT COUNT(*) FROM " + getTableName()
+        + " WHERE (AirTime, ActualElapsedTime) >= (200, 230)";
     JsonNode rowResult = postQuery(rowQuery);
     assertNoError(rowResult);
     long rowCount = 
rowResult.get("resultTable").get("rows").get(0).get(0).asLong();
 
-    String expandedQuery = "SELECT COUNT(*) FROM mytable "
+    String expandedQuery = "SELECT COUNT(*) FROM " + getTableName() + " "
         + "WHERE (AirTime > 200) OR ((AirTime = 200) AND (ActualElapsedTime > 
230)) "
         + "OR ((AirTime = 200) AND (ActualElapsedTime = 230))";
     JsonNode expandedResult = postQuery(expandedQuery);
@@ -419,12 +389,13 @@ public class RowExpressionIntegrationTest extends 
BaseClusterIntegrationTestSet
   @Test
   public void testRowEqualityVsExpanded()
       throws Exception {
-    String rowQuery = "SELECT COUNT(*) FROM mytable WHERE (AirTime, 
ActualElapsedTime) = (201, 230)";
+    String rowQuery = "SELECT COUNT(*) FROM " + getTableName()
+        + " WHERE (AirTime, ActualElapsedTime) = (201, 230)";
     JsonNode rowResult = postQuery(rowQuery);
     assertNoError(rowResult);
     long rowCount = 
rowResult.get("resultTable").get("rows").get(0).get(0).asLong();
 
-    String expandedQuery = "SELECT COUNT(*) FROM mytable "
+    String expandedQuery = "SELECT COUNT(*) FROM " + getTableName() + " "
         + "WHERE (AirTime = 201) AND (ActualElapsedTime = 230)";
     JsonNode expandedResult = postQuery(expandedQuery);
     assertNoError(expandedResult);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/StarTreeTest.java
similarity index 67%
rename from 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
rename to 
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/StarTreeTest.java
index 041b05be9a2..ef9f724016c 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/StarTreeTest.java
@@ -16,16 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
-import org.apache.commons.io.FileUtils;
 import org.apache.pinot.integration.tests.startree.SegmentInfoProvider;
 import org.apache.pinot.integration.tests.startree.StarTreeQueryGenerator;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -33,10 +34,10 @@ import 
org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
 import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
 import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -48,22 +49,18 @@ import static org.testng.Assert.assertTrue;
  * Integration test for Star-Tree based indexes.
  * <ul>
  *   <li>
- *     Set up the Pinot cluster and create two tables, one with default 
indexes, one with star tree indexes
+ *     Set up the Pinot cluster and create a table with star tree indexes
  *   </li>
  *   <li>
- *     Send queries to both the tables and assert that results match
- *   </li>
- *   <li>
- *     Query to reference table is sent with TOP 10000, and the comparator 
ensures that response from star tree is
- *     contained within the reference response. This is to avoid false 
failures when groups with same value are
- *     truncated due to TOP N
+ *     Send queries with and without star-tree and assert that results match
  *   </li>
  * </ul>
  */
-public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest 
{
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class StarTreeTest extends CustomDataQueryClusterIntegrationTest {
   public static final String FILTER_STARTREE_INDEX = "FILTER_STARTREE_INDEX";
   private static final String SCHEMA_FILE_NAME =
-          
"On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema";
+      "On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema";
   private static final int NUM_STAR_TREE_DIMENSIONS = 5;
   private static final int NUM_STAR_TREE_METRICS = 6;
   private static final List<AggregationFunctionType> 
AGGREGATION_FUNCTION_TYPES =
@@ -78,9 +75,38 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
   private StarTreeQueryGenerator _starTree1QueryGenerator;
   private StarTreeQueryGenerator _starTree2QueryGenerator;
 
+  // Fixed dimensions and metrics for the first star-tree
+  private final List<String> _starTree1Dimensions =
+      Arrays.asList("OriginCityName", "DepTimeBlk", "LongestAddGTime", 
"CRSDepTime", "DivArrDelay");
+  private final List<String> _starTree1Metrics =
+      Arrays.asList("CarrierDelay", "DepDelay", "LateAircraftDelay", 
"ArrivalDelayGroups", "ArrDel15", "AirlineID");
+
+  // Randomly picked dimensions and metrics for the second star-tree 
(initialized in setUp)
+  private List<String> _starTree2Dimensions;
+  private List<String> _starTree2Metrics;
+
+  @Override
+  public String getTableName() {
+    return "StarTreeTest";
+  }
+
   @Override
-  protected String getSchemaFileName() {
-    return SCHEMA_FILE_NAME;
+  public Schema createSchema() {
+    try {
+      InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(SCHEMA_FILE_NAME);
+      Assert.assertNotNull(inputStream);
+      Schema schema = Schema.fromInputStream(inputStream);
+      schema.setSchemaName(getTableName());
+      return schema;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public List<File> createAvroFiles()
+      throws Exception {
+    return unpackAvroData(_tempDir);
   }
 
   // NOTE: Star-Tree and SegmentInfoProvider does not work on no-dictionary 
dimensions
@@ -89,71 +115,67 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
     return Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay");
   }
 
-  @BeforeClass
-  public void setUp()
-      throws Exception {
-    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
-    // Start the Pinot cluster
-    startZk();
-    startController();
-    startBroker();
-    startServers(2);
+  @Override
+  public String getTimeColumnName() {
+    return "DaysSinceEpoch";
+  }
 
-    // Create and upload the schema and table config
+  @Override
+  public TableConfig createOfflineTableConfig() {
     Schema schema = createSchema();
-    addSchema(schema);
-
-    // Pick fixed dimensions and metrics for the first star-tree
-    List<String> starTree1Dimensions =
-        Arrays.asList("OriginCityName", "DepTimeBlk", "LongestAddGTime", 
"CRSDepTime", "DivArrDelay");
-    List<String> starTree1Metrics =
-        Arrays.asList("CarrierDelay", "DepDelay", "LateAircraftDelay", 
"ArrivalDelayGroups", "ArrDel15", "AirlineID");
-    int starTree1MaxLeafRecords = 10;
 
     // Randomly pick some dimensions and metrics for the second star-tree
     // Exclude TotalAddGTime since it's a multi-value column, should not be in 
dimension split order.
     List<String> allDimensions = new ArrayList<>(schema.getDimensionNames());
     allDimensions.remove("TotalAddGTime");
     Collections.shuffle(allDimensions, _random);
-    List<String> starTree2Dimensions = allDimensions.subList(0, 
NUM_STAR_TREE_DIMENSIONS);
+    _starTree2Dimensions = allDimensions.subList(0, NUM_STAR_TREE_DIMENSIONS);
     List<String> allMetrics = new ArrayList<>(schema.getMetricNames());
     Collections.shuffle(allMetrics, _random);
-    List<String> starTree2Metrics = allMetrics.subList(0, 
NUM_STAR_TREE_METRICS);
+    _starTree2Metrics = allMetrics.subList(0, NUM_STAR_TREE_METRICS);
+
+    int starTree1MaxLeafRecords = 10;
     int starTree2MaxLeafRecords = 100;
+    int starTree3MaxLeafRecords = 10;
 
     // Tests StarTree aggregate for multi-value column
     List<String> starTree3Dimensions =
         Arrays.asList("OriginCityName", "DepTimeBlk", "LongestAddGTime", 
"CRSDepTime", "DivArrDelay");
-    int starTree3MaxLeafRecords = 10;
 
-    TableConfig tableConfig = createOfflineTableConfig();
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(getTableName())
+        .setTimeColumnName(getTimeColumnName())
+        .setNoDictionaryColumns(getNoDictionaryColumns())
+        .setNumReplicas(getNumReplicas())
+        .setBrokerTenant(getBrokerTenant())
+        .setServerTenant(getServerTenant())
+        .build();
     tableConfig.getIndexingConfig().setStarTreeIndexConfigs(
-        Arrays.asList(getStarTreeIndexConfig(starTree1Dimensions, 
starTree1Metrics, starTree1MaxLeafRecords),
-            getStarTreeIndexConfig(starTree2Dimensions, starTree2Metrics, 
starTree2MaxLeafRecords),
+        Arrays.asList(
+            getStarTreeIndexConfig(_starTree1Dimensions, _starTree1Metrics, 
starTree1MaxLeafRecords),
+            getStarTreeIndexConfig(_starTree2Dimensions, _starTree2Metrics, 
starTree2MaxLeafRecords),
             getStarTreeIndexConfigForMVColAgg(starTree3Dimensions, 
starTree3MaxLeafRecords)));
-    addTableConfig(tableConfig);
 
-    // Unpack the Avro files
-    List<File> avroFiles = unpackAvroData(_tempDir);
-
-    // Create and upload segments
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
-    uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
+    return tableConfig;
+  }
 
-    // Set up the query generators
-    SegmentInfoProvider segmentInfoProvider = new 
SegmentInfoProvider(_tarDir.getPath());
-    List<String> aggregationFunctions = new 
ArrayList<>(AGGREGATION_FUNCTION_TYPES.size());
-    for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) {
-      aggregationFunctions.add(functionType.getName());
+  /**
+   * Initialize query generators after segments have been built and uploaded.
+   * This is called lazily before the first test that needs them.
+   */
+  private void ensureQueryGeneratorsInitialized()
+      throws Exception {
+    if (_starTree1QueryGenerator == null || _starTree2QueryGenerator == null) {
+      List<String> aggregationFunctions = new 
ArrayList<>(AGGREGATION_FUNCTION_TYPES.size());
+      for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) {
+        aggregationFunctions.add(functionType.getName());
+      }
+      SegmentInfoProvider segmentInfoProvider = new 
SegmentInfoProvider(_tarDir.getPath());
+      _starTree1QueryGenerator = new StarTreeQueryGenerator(getTableName(), 
_starTree1Dimensions, _starTree1Metrics,
+          segmentInfoProvider.getSingleValueDimensionValuesMap(), 
aggregationFunctions, _random);
+      _starTree2QueryGenerator = new StarTreeQueryGenerator(getTableName(), 
_starTree2Dimensions, _starTree2Metrics,
+          segmentInfoProvider.getSingleValueDimensionValuesMap(), 
aggregationFunctions, _random);
     }
-    _starTree1QueryGenerator = new StarTreeQueryGenerator(DEFAULT_TABLE_NAME, 
starTree1Dimensions, starTree1Metrics,
-        segmentInfoProvider.getSingleValueDimensionValuesMap(), 
aggregationFunctions, _random);
-    _starTree2QueryGenerator = new StarTreeQueryGenerator(DEFAULT_TABLE_NAME, 
starTree2Dimensions, starTree2Metrics,
-        segmentInfoProvider.getSingleValueDimensionValuesMap(), 
aggregationFunctions, _random);
-
-    // Wait for all documents loaded
-    waitForAllDocsLoaded(600_000L);
   }
 
   private static StarTreeIndexConfig getStarTreeIndexConfig(List<String> 
dimensions, List<String> metrics,
@@ -186,6 +208,7 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
   public void testGeneratedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    ensureQueryGeneratorsInitialized();
     for (int i = 0; i < NUM_QUERIES_TO_GENERATE; i += 2) {
       testStarQuery(_starTree1QueryGenerator.nextQuery(), false);
       testStarQuery(_starTree2QueryGenerator.nextQuery(), false);
@@ -196,19 +219,22 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
   public void testHardCodedQueries(boolean useMultiStageQueryEngine)
       throws Exception {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String tableName = getTableName();
+
     // This query can test the case of one predicate matches all the child 
nodes but star-node cannot be used because
     // the predicate is included as remaining predicate from another branch
-    String starQuery = "SELECT DepTimeBlk, COUNT(*) FROM mytable "
-        + "WHERE CRSDepTime BETWEEN 1137 AND 1849 AND DivArrDelay > 218 AND 
CRSDepTime NOT IN (35, 1633, 1457, 140) "
+    String starQuery = "SELECT DepTimeBlk, COUNT(*) FROM " + tableName
+        + " WHERE CRSDepTime BETWEEN 1137 AND 1849 AND DivArrDelay > 218 AND 
CRSDepTime NOT IN (35, 1633, 1457, 140) "
         + "AND LongestAddGTime NOT IN (17, 105, 20, 22) GROUP BY DepTimeBlk 
ORDER BY DepTimeBlk";
     testStarQuery(starQuery, !useMultiStageQueryEngine);
 
     // Test MIN, MAX, SUM rewrite on LONG col
-    starQuery =
-        "SELECT MIN(AirlineID), MAX(AirlineID), SUM(AirlineID) FROM mytable 
WHERE CRSDepTime BETWEEN 1137 AND 1849";
+    starQuery = "SELECT MIN(AirlineID), MAX(AirlineID), SUM(AirlineID) FROM " 
+ tableName
+        + " WHERE CRSDepTime BETWEEN 1137 AND 1849";
     testStarQuery(starQuery, !useMultiStageQueryEngine);
 
-    starQuery = "SET enableNullHandling=true; SELECT COUNT(DivArrDelay) FROM 
mytable WHERE DivArrDelay > 218";
+    starQuery = "SET enableNullHandling=true; SELECT COUNT(DivArrDelay) FROM " 
+ tableName
+        + " WHERE DivArrDelay > 218";
     testStarQuery(starQuery, false);
   }
 
@@ -216,20 +242,19 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
   public void testHardCodedFilteredAggQueries(boolean useMultiStageQueryEngine)
       throws Exception {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
-    String starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE 
CRSDepTime = 35) FROM mytable "
-        + "WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
-    // Don't verify that the query plan uses StarTree index, as this query 
results in FILTER_EMPTY in the query plan.
-    // This is still a valuable test, as it caught a bug where only the 
subFilterContext was being preserved through
-    // AggregationFunctionUtils#buildFilteredAggregationInfos
+    String tableName = getTableName();
+
+    String starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE 
CRSDepTime = 35) FROM " + tableName
+        + " WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
     testStarQuery(starQuery, false);
 
     // Ensure the filtered agg and unfiltered agg can co-exist in one query
-    starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE 
DivArrDelay > 20) FROM mytable "
-        + "WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
+    starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE 
DivArrDelay > 20) FROM " + tableName
+        + " WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
     testStarQuery(starQuery, !useMultiStageQueryEngine);
 
-    starQuery = "SELECT DepTimeBlk, COUNT(*) FILTER (WHERE CRSDepTime != 35) 
FROM mytable "
-        + "GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
+    starQuery = "SELECT DepTimeBlk, COUNT(*) FILTER (WHERE CRSDepTime != 35) 
FROM " + tableName
+        + " GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
     testStarQuery(starQuery, !useMultiStageQueryEngine);
   }
 
@@ -237,24 +262,25 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
   public void testMultiValueColumnAggregations(boolean 
useMultiStageQueryEngine)
       throws Exception {
     setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String tableName = getTableName();
 
-    String starQuery = "SELECT COUNTMV(TotalAddGTime), SUMMV(TotalAddGTime), 
AVGMV(TotalAddGTime) FROM mytable";
+    String starQuery = "SELECT COUNTMV(TotalAddGTime), SUMMV(TotalAddGTime), 
AVGMV(TotalAddGTime) FROM " + tableName;
     testStarQuery(starQuery, !useMultiStageQueryEngine);
 
-    starQuery = "SELECT OriginCityName, COUNTMV(TotalAddGTime), 
AVGMV(TotalAddGTime), SUMMV(TotalAddGTime) "
-        + "FROM mytable GROUP BY OriginCityName ORDER BY OriginCityName";
+    starQuery = "SELECT OriginCityName, COUNTMV(TotalAddGTime), 
AVGMV(TotalAddGTime), SUMMV(TotalAddGTime) FROM "
+        + tableName + " GROUP BY OriginCityName ORDER BY OriginCityName";
     testStarQuery(starQuery, !useMultiStageQueryEngine);
 
-    starQuery = "SELECT DepTimeBlk, SUMMV(TotalAddGTime), AVGMV(TotalAddGTime) 
FROM mytable "
-        + "WHERE CRSDepTime > 1000 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
+    starQuery = "SELECT DepTimeBlk, SUMMV(TotalAddGTime), AVGMV(TotalAddGTime) 
FROM " + tableName
+        + " WHERE CRSDepTime > 1000 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
     testStarQuery(starQuery, !useMultiStageQueryEngine);
 
-    starQuery = "SELECT OriginCityName, DepTimeBlk, SUMMV(TotalAddGTime) FROM 
mytable "
-        + "GROUP BY OriginCityName, DepTimeBlk ORDER BY OriginCityName, 
DepTimeBlk LIMIT 100";
+    starQuery = "SELECT OriginCityName, DepTimeBlk, SUMMV(TotalAddGTime) FROM 
" + tableName
+        + " GROUP BY OriginCityName, DepTimeBlk ORDER BY OriginCityName, 
DepTimeBlk LIMIT 100";
     testStarQuery(starQuery, !useMultiStageQueryEngine);
 
-    starQuery = "SELECT CRSDepTime, AVGMV(TotalAddGTime) FROM mytable "
-        + "WHERE CRSDepTime BETWEEN 800 AND 1200 AND DivArrDelay < 100 "
+    starQuery = "SELECT CRSDepTime, AVGMV(TotalAddGTime) FROM " + tableName
+        + " WHERE CRSDepTime BETWEEN 800 AND 1200 AND DivArrDelay < 100 "
         + "GROUP BY CRSDepTime ORDER BY CRSDepTime";
     testStarQuery(starQuery, !useMultiStageQueryEngine);
   }
@@ -297,17 +323,4 @@ public class StarTreeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
             + "Star Query: %s\nStar Response: %s\nReference Query: 
%s\nReference Response: %s\nRandom Seed: %d",
         starQuery, starResponse, referenceQuery, referenceResponse, 
_randomSeed));
   }
-
-  @AfterClass
-  public void tearDown()
-      throws Exception {
-    dropOfflineTable(DEFAULT_TABLE_NAME);
-
-    stopServer();
-    stopBroker();
-    stopController();
-    stopZk();
-
-    FileUtils.deleteDirectory(_tempDir);
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to