This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 54bea86f89c Migrate GroupByOptions, GroupByTrimming, and RowExpression
integration tests to custom package (#17857)
54bea86f89c is described below
commit 54bea86f89c082981f839622fc70b211aa073a5f
Author: Xiang Fu <[email protected]>
AuthorDate: Thu Mar 12 01:24:10 2026 -0700
Migrate GroupByOptions, GroupByTrimming, and RowExpression integration
tests to custom package (#17857)
Move these tests to share a single Pinot cluster via
CustomDataQueryClusterIntegrationTest
(@BeforeSuite/@AfterSuite lifecycle) instead of each test starting its own
cluster. This
reduces total integration test setup time by eliminating 3 redundant
cluster startups.
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../AggregateMetricsClusterIntegrationTest.java | 125 -------------
.../GroupByEnableTrimOptionIntegrationTest.java | 9 +-
.../StarTreeFunctionParametersIntegrationTest.java | 5 +-
.../tests/custom/AggregateMetricsTest.java | 133 +++++++++++++
.../BigNumberOfSegmentsTest.java} | 129 ++++---------
.../CLPEncodingRealtimeTest.java} | 128 +++++--------
.../GroupByOptionsTest.java} | 129 ++++++-------
.../GroupByTrimmingTest.java} | 173 ++++++++---------
.../tests/custom/MultiColumnTextIndicesTest.java | 2 +-
.../RowExpressionTest.java} | 163 +++++++---------
.../StarTreeTest.java} | 205 +++++++++++----------
11 files changed, 542 insertions(+), 659 deletions(-)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
deleted file mode 100644
index b7ba0103ab3..00000000000
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.IndexingConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-/**
- * Integration test that enables aggregate metrics for the LLC real-time table.
- */
-public class AggregateMetricsClusterIntegrationTest extends
BaseClusterIntegrationTestSet {
-
- @BeforeClass
- public void setUp()
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
-
- // Start the Pinot cluster
- startZk();
- startController();
- startBroker();
- startServer();
-
- // Start Kafka
- startKafka();
-
- // Unpack the Avro files
- List<File> avroFiles = unpackAvroData(_tempDir);
-
- // Create and upload the schema and table config with reduced number of
columns and aggregate metrics on
- Schema schema =
- new
Schema.SchemaBuilder().setSchemaName(getTableName()).addSingleValueDimension("Carrier",
DataType.STRING)
- .addSingleValueDimension("Origin",
DataType.STRING).addMetric("AirTime", DataType.LONG)
- .addMetric("ArrDelay", DataType.DOUBLE)
- .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH",
"1:DAYS").build();
- addSchema(schema);
- TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
- IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
- indexingConfig.setSortedColumn(Collections.singletonList("Carrier"));
-
indexingConfig.setInvertedIndexColumns(Collections.singletonList("Origin"));
- indexingConfig.setNoDictionaryColumns(Arrays.asList("AirTime",
"ArrDelay"));
-
indexingConfig.setRangeIndexColumns(Collections.singletonList("DaysSinceEpoch"));
- indexingConfig.setBloomFilterColumns(Collections.singletonList("Origin"));
- indexingConfig.setAggregateMetrics(true);
- addTableConfig(tableConfig);
-
- // Push data into Kafka
- pushAvroIntoKafka(avroFiles);
-
- // Set up the H2 connection
- setUpH2Connection(avroFiles);
-
- // Wait for all documents loaded
- waitForAllDocsLoaded(600_000L);
- }
-
- @Override
- protected void waitForAllDocsLoaded(long timeoutMs) {
- // NOTE: For aggregate metrics, we need to test the aggregation result
instead of the document count because
- // documents can be merged during ingestion.
- String sql = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable";
- TestUtils.waitForCondition(aVoid -> {
- try {
- JsonNode queryResult = postQuery(sql);
- JsonNode aggregationResults =
queryResult.get("resultTable").get("rows").get(0);
- return aggregationResults.get(0).asInt() == -165429728 &&
aggregationResults.get(1).asInt() == -175625957;
- } catch (Exception e) {
- return null;
- }
- }, 100L, timeoutMs, "Failed to load all documents");
- }
-
- @Test(dataProvider = "useBothQueryEngines")
- public void testQueries(boolean useMultiStageQueryEngine)
- throws Exception {
- setUseMultiStageQueryEngine(useMultiStageQueryEngine);
- String query = "SELECT SUM(AirTime), SUM(ArrDelay) FROM mytable";
- testQuery(query);
- query = "SELECT SUM(AirTime), DaysSinceEpoch FROM mytable GROUP BY
DaysSinceEpoch ORDER BY SUM(AirTime) DESC";
- testQuery(query);
- query = "SELECT Origin, SUM(ArrDelay) FROM mytable WHERE Carrier = 'AA'
GROUP BY Origin ORDER BY Origin";
- testQuery(query);
- }
-
- @AfterClass
- public void tearDown()
- throws Exception {
- dropRealtimeTable(getTableName());
- stopServer();
- stopBroker();
- stopController();
- stopKafka();
- stopZk();
- FileUtils.deleteDirectory(_tempDir);
- }
-}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
index 4f534e458c0..98027bc7d66 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.integration.tests.custom.GroupByOptionsTest;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -41,7 +42,7 @@ import org.testng.annotations.Test;
import static
org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl;
-// similar to GroupByOptionsIntegrationTest but this test verifies that
default enable group trim option works even
+// similar to GroupByOptionsTest but this test verifies that default enable
group trim option works even
// if hint is not set in the query
public class GroupByEnableTrimOptionIntegrationTest extends
BaseClusterIntegrationTestSet {
@@ -69,7 +70,7 @@ public class GroupByEnableTrimOptionIntegrationTest extends
BaseClusterIntegrati
TableConfig tableConfig = createOfflineTableConfig();
addTableConfig(tableConfig);
- List<File> avroFiles =
GroupByOptionsIntegrationTest.createAvroFile(_tempDir);
+ List<File> avroFiles = GroupByOptionsTest.createAvroFile(_tempDir);
ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0, _segmentDir, _tarDir);
uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
@@ -206,8 +207,8 @@ public class GroupByEnableTrimOptionIntegrationTest extends
BaseClusterIntegrati
JsonNode result = postV2Query(sql);
JsonNode plan = postV2Query(option + " set explainAskingServers=true;
explain plan for " + query);
- Assert.assertEquals(GroupByOptionsIntegrationTest.toResultStr(result),
expectedResult);
- Assert.assertEquals(GroupByOptionsIntegrationTest.toExplainStr(plan,
true), expectedPlan);
+ Assert.assertEquals(GroupByOptionsTest.toResultStr(result),
expectedResult);
+ Assert.assertEquals(GroupByOptionsTest.toExplainStr(plan, true),
expectedPlan);
}
private JsonNode postV2Query(String query)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeFunctionParametersIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeFunctionParametersIntegrationTest.java
index 9bf9da4e18c..097255cf821 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeFunctionParametersIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeFunctionParametersIntegrationTest.java
@@ -28,6 +28,7 @@ import java.util.function.Function;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.pinot.integration.tests.custom.StarTreeTest;
import org.apache.pinot.segment.spi.Constants;
import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
@@ -226,7 +227,7 @@ public class StarTreeFunctionParametersIntegrationTest
extends BaseClusterIntegr
*/
private void checkQueryDoesNotUseStarTreeIndex(String query, int
expectedResult) throws Exception {
JsonNode explainPlan = postQuery("EXPLAIN PLAN FOR " + query);
-
assertFalse(explainPlan.toString().contains(StarTreeClusterIntegrationTest.FILTER_STARTREE_INDEX));
+
assertFalse(explainPlan.toString().contains(StarTreeTest.FILTER_STARTREE_INDEX));
assertEquals(getDistinctCountResult(query), expectedResult);
}
@@ -244,7 +245,7 @@ public class StarTreeFunctionParametersIntegrationTest
extends BaseClusterIntegr
} catch (Exception e) {
throw new RuntimeException(e);
}
- return
result.toString().contains(StarTreeClusterIntegrationTest.FILTER_STARTREE_INDEX);
+ return
result.toString().contains(StarTreeTest.FILTER_STARTREE_INDEX);
}, 1000L, 10_000L, "Failed to use star-tree index for query: " + query
);
assertEquals(getDistinctCountResult(query), expectedResult);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/AggregateMetricsTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/AggregateMetricsTest.java
new file mode 100644
index 00000000000..d35e4f40e69
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/AggregateMetricsTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.custom;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Integration test that enables aggregate metrics for the LLC real-time table.
+ */
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class AggregateMetricsTest extends
CustomDataQueryClusterIntegrationTest {
+
+ private static final long EXPECTED_SUM_AIR_TIME = -165429728L;
+ private static final long EXPECTED_SUM_ARR_DELAY = -175625957L;
+
+ @Override
+ public String getTableName() {
+ return "AggregateMetricsTest";
+ }
+
+ @Override
+ public boolean isRealtimeTable() {
+ return true;
+ }
+
+ @Override
+ public Schema createSchema() {
+ return new Schema.SchemaBuilder().setSchemaName(getTableName())
+ .addSingleValueDimension("Carrier", DataType.STRING)
+ .addSingleValueDimension("Origin", DataType.STRING)
+ .addMetric("AirTime", DataType.LONG)
+ .addMetric("ArrDelay", DataType.DOUBLE)
+ .addDateTime("DaysSinceEpoch", DataType.INT, "1:DAYS:EPOCH", "1:DAYS")
+ .build();
+ }
+
+ @Override
+ public List<File> createAvroFiles()
+ throws Exception {
+ return unpackAvroData(_tempDir);
+ }
+
+ @Override
+ protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
+ TableConfig tableConfig = super.createRealtimeTableConfig(sampleAvroFile);
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ indexingConfig.setSortedColumn(Collections.singletonList("Carrier"));
+
indexingConfig.setInvertedIndexColumns(Collections.singletonList("Origin"));
+ indexingConfig.setNoDictionaryColumns(Arrays.asList("AirTime",
"ArrDelay"));
+
indexingConfig.setRangeIndexColumns(Collections.singletonList("DaysSinceEpoch"));
+ indexingConfig.setBloomFilterColumns(Collections.singletonList("Origin"));
+ indexingConfig.setAggregateMetrics(true);
+ return tableConfig;
+ }
+
+ @Nullable
+ @Override
+ protected String getSortedColumn() {
+ return null;
+ }
+
+ @Override
+ public String getTimeColumnName() {
+ return "DaysSinceEpoch";
+ }
+
+ @Override
+ protected void waitForAllDocsLoaded(long timeoutMs) {
+ // For aggregate metrics, documents can be merged during ingestion, so we
check aggregation results
+ // instead of document count.
+ String sql = "SELECT SUM(AirTime), SUM(ArrDelay) FROM " + getTableName();
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ JsonNode queryResult = postQuery(sql);
+ JsonNode aggregationResults =
queryResult.get("resultTable").get("rows").get(0);
+ return aggregationResults.get(0).asLong() == EXPECTED_SUM_AIR_TIME
+ && aggregationResults.get(1).asLong() == EXPECTED_SUM_ARR_DELAY;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, timeoutMs, "Failed to load all documents");
+ }
+
+ @Test
+ public void testAggregateMetricsQueries()
+ throws Exception {
+ // Test total aggregation
+ JsonNode result = postQuery("SELECT SUM(AirTime), SUM(ArrDelay) FROM " +
getTableName());
+ JsonNode rows = result.get("resultTable").get("rows").get(0);
+ assertEquals(rows.get(0).asLong(), EXPECTED_SUM_AIR_TIME);
+ assertEquals(rows.get(1).asLong(), EXPECTED_SUM_ARR_DELAY);
+
+ // Test group by with order
+ result = postQuery("SELECT SUM(AirTime), DaysSinceEpoch FROM " +
getTableName()
+ + " GROUP BY DaysSinceEpoch ORDER BY SUM(AirTime) DESC LIMIT 1");
+ assertEquals(result.get("exceptions").size(), 0);
+
+ // Test filter with group by
+ result = postQuery("SELECT Origin, SUM(ArrDelay) FROM " + getTableName()
+ + " WHERE Carrier = 'AA' GROUP BY Origin ORDER BY Origin LIMIT 10");
+ assertEquals(result.get("exceptions").size(), 0);
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BigNumberOfSegmentsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BigNumberOfSegmentsTest.java
similarity index 61%
rename from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BigNumberOfSegmentsIntegrationTest.java
rename to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BigNumberOfSegmentsTest.java
index aaf8d5ec009..426925545ba 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BigNumberOfSegmentsIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/BigNumberOfSegmentsTest.java
@@ -16,12 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -30,16 +29,12 @@ import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static
org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl;
@@ -50,7 +45,8 @@ import static
org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.get
* Test is rather slow (1 minute+) and thus disabled by default.
* Still, it can be useful to run it manually.
*/
-public class BigNumberOfSegmentsIntegrationTest extends
BaseClusterIntegrationTestSet {
+@Test(suiteName = "CustomClusterIntegrationTest", enabled = false)
+public class BigNumberOfSegmentsTest extends
CustomDataQueryClusterIntegrationTest {
static final int FILES_NO = 1000;
static final int RECORDS_NO = 5;
@@ -60,88 +56,52 @@ public class BigNumberOfSegmentsIntegrationTest extends
BaseClusterIntegrationTe
static final String FLOAT_COL = "f";
static final String DOUBLE_COL = "d";
static final int STR_COL_NUM = 200;
+ private static final String TIME_COL = "ts";
- @BeforeClass
- public void setUp()
- throws Exception {
- System.out.println("Pid is " + ProcessHandle.current().pid());
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
- startZk();
- startKafka();
- startController();
- startServer();
- startBroker();
+ @Override
+ public String getTableName() {
+ return "BigNumberOfSegmentsTest";
+ }
+ @Override
+ public Schema createSchema() {
Schema.SchemaBuilder builder = new Schema.SchemaBuilder()
- .setSchemaName(DEFAULT_SCHEMA_NAME)
+ .setSchemaName(getTableName())
.addSingleValueDimension(INT_COL, FieldSpec.DataType.INT)
.addSingleValueDimension(LONG_COL, FieldSpec.DataType.LONG)
.addSingleValueDimension(FLOAT_COL, FieldSpec.DataType.FLOAT)
.addSingleValueDimension(DOUBLE_COL, FieldSpec.DataType.DOUBLE)
- .addDateTimeField(DEFAULT_TIME_COLUMN_NAME,
FieldSpec.DataType.TIMESTAMP, "TIMESTAMP", "1:MILLISECONDS");
+ .addDateTimeField(TIME_COL, FieldSpec.DataType.TIMESTAMP, "TIMESTAMP",
"1:MILLISECONDS");
for (int i = 0; i < STR_COL_NUM; i++) {
builder.addSingleValueDimension(STR_COL_PREFIX + i,
FieldSpec.DataType.STRING);
}
- Schema schema = builder.build();
-
- List<File> avroFiles = createAvroFiles(_tempDir);
-
- addSchema(schema);
- TableConfig tableConfig = createOfflineTableConfig(schema);
- addTableConfig(tableConfig);
-
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0, _segmentDir, _tarDir);
- uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
-
- // Wait for all documents loaded
- TestUtils.waitForCondition(() ->
getCurrentCountStarResult(DEFAULT_TABLE_NAME) == FILES_NO * RECORDS_NO, 100L,
- 600_000,
- "Failed to load documents", true, Duration.ofMillis(60_000 / 10));
-
- setUseMultiStageQueryEngine(true);
+ return builder.build();
}
- // to slow for CI (1+ minute)
- @Test(enabled = false)
- public void testCreateManySegments()
+ @Override
+ public List<File> createAvroFiles()
throws Exception {
- JsonNode node = postV2Query("select sum(i) + sum(j) + sum(d), count(*)
from " + DEFAULT_TABLE_NAME);
-
- assertNoError(node);
+ return createAvroData(_tempDir);
}
- protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
- AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
- return new TableConfigBuilder(TableType.REALTIME)
- .setTableName(getTableName())
- .setTimeColumnName(getTimeColumnName())
- .setInvertedIndexColumns(getInvertedIndexColumns())
- .setRangeIndexColumns(getRangeIndexColumns())
- .setFieldConfigList(getFieldConfigs())
- .setNumReplicas(getNumReplicas())
- .setSegmentVersion(getSegmentVersion())
- .setLoadMode(getLoadMode())
- .setTaskConfig(getTaskConfig())
- .setBrokerTenant(getBrokerTenant())
- .setServerTenant(getServerTenant())
- .setIngestionConfig(getIngestionConfig())
- .setQueryConfig(getQueryConfig())
- .setStreamConfigs(getStreamConfigs())
- .setNullHandlingEnabled(getNullHandlingEnabled())
- .build();
+ @Override
+ public int getNumAvroFiles() {
+ return FILES_NO;
}
- protected int getRealtimeSegmentFlushSize() {
- return RECORDS_NO;
+ @Override
+ protected long getCountStarResult() {
+ return (long) FILES_NO * RECORDS_NO;
}
- TableConfig createOfflineTableConfig(Schema schema) {
+ @Override
+ public TableConfig createOfflineTableConfig() {
+ Schema schema = createSchema();
return new TableConfigBuilder(TableType.OFFLINE)
.setTableName(getTableName())
- .setTimeColumnName(DEFAULT_TIME_COLUMN_NAME)
+ .setTimeColumnName(TIME_COL)
.setNumReplicas(getNumReplicas())
.setBrokerTenant(getBrokerTenant())
.setRetentionTimeUnit("DAYS")
@@ -150,37 +110,20 @@ public class BigNumberOfSegmentsIntegrationTest extends
BaseClusterIntegrationTe
.build();
}
- @Override
- protected List<String> getInvertedIndexColumns() {
- return Arrays.asList(FLOAT_COL, INT_COL);
- }
-
- @Override
- protected List<String> getRangeIndexColumns() {
- return Arrays.asList(INT_COL);
- }
-
- private JsonNode postV2Query(String query)
+ // Too slow for CI (1+ minute)
+ @Test(enabled = false)
+ public void testCreateManySegments()
throws Exception {
- return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(), true),
null,
- getExtraQueryProperties());
+ JsonNode node = postQuery("SELECT sum(i) + sum(j) + sum(d), count(*) FROM
" + getTableName(),
+ getBrokerQueryApiUrl(getBrokerBaseApiUrl(), true), null,
getExtraQueryProperties());
+ assertEquals(node.get("exceptions").size(), 0);
}
- @AfterClass
- public void tearDown()
- throws Exception {
- dropOfflineTable(DEFAULT_TABLE_NAME);
-
- stopServer();
- stopBroker();
- stopController();
- stopKafka();
- stopZk();
-
- FileUtils.deleteDirectory(_tempDir);
+ private static void assertEquals(int actual, int expected) {
+ org.testng.Assert.assertEquals(actual, expected);
}
- private static List<File> createAvroFiles(File tempDir)
+ private static List<File> createAvroData(File tempDir)
throws IOException {
// create avro schema
@@ -190,7 +133,7 @@ public class BigNumberOfSegmentsIntegrationTest extends
BaseClusterIntegrationTe
fields.add(new Field(LONG_COL, create(Type.LONG), null, null));
fields.add(new Field(FLOAT_COL, create(Type.FLOAT), null, null));
fields.add(new Field(DOUBLE_COL, create(Type.DOUBLE), null, null));
- fields.add(new Field(DEFAULT_TIME_COLUMN_NAME, create(Type.LONG), null,
null));
+ fields.add(new Field(TIME_COL, create(Type.LONG), null, null));
for (int i = 0; i < STR_COL_NUM; i++) {
fields.add(new Field(STR_COL_PREFIX + i, create(Type.STRING), null,
null));
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CLPEncodingRealtimeTest.java
similarity index 61%
rename from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
rename to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CLPEncodingRealtimeTest.java
index e9f409bfe17..eec8b128045 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CLPEncodingRealtimeIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/CLPEncodingRealtimeTest.java
@@ -16,136 +16,94 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.config.table.FieldConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class CLPEncodingRealtimeIntegrationTest extends
BaseClusterIntegrationTestSet {
- private List<File> _avroFiles;
- private FieldConfig.CompressionCodec _selectedCompressionCodec;
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class CLPEncodingRealtimeTest extends
CustomDataQueryClusterIntegrationTest {
- @BeforeClass
- public void setUp()
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
- _avroFiles = unpackAvroData(_tempDir);
-
- // Randomly select CLP or CLPV2 compression codec
- _selectedCompressionCodec =
- RANDOM.nextBoolean() ? FieldConfig.CompressionCodec.CLP :
FieldConfig.CompressionCodec.CLPV2;
-
- // Start the Pinot cluster
- startZk();
- startKafka();
- // Start a customized controller with more frequent realtime segment
validation
- startController();
- startBroker();
- startServer();
- pushAvroIntoKafka(_avroFiles);
-
- Schema schema = createSchema();
- addSchema(schema);
- TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
- addTableConfig(tableConfig);
-
- waitForAllDocsLoaded(600_000L);
- }
+ private final FieldConfig.CompressionCodec _selectedCompressionCodec =
+ RANDOM.nextBoolean() ? FieldConfig.CompressionCodec.CLP :
FieldConfig.CompressionCodec.CLPV2;
- @Nullable
@Override
- protected List<String> getInvertedIndexColumns() {
- return null;
+ public String getTableName() {
+ return "CLPEncodingRealtimeTest";
}
- @Nullable
@Override
- protected List<String> getRangeIndexColumns() {
- return null;
+ public boolean isRealtimeTable() {
+ return true;
}
- @Nullable
@Override
- protected List<String> getBloomFilterColumns() {
- return null;
+ public Schema createSchema() {
+ return new Schema.SchemaBuilder().setSchemaName(getTableName())
+ .addSingleValueDimension("logLine", FieldSpec.DataType.STRING)
+ .addDateTimeField("timestampInEpoch", FieldSpec.DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
}
- @Nullable
@Override
- protected String getSortedColumn() {
- return null;
+ public List<File> createAvroFiles()
+ throws Exception {
+ return unpackTarData("clpEncodingITData.tar.gz", _tempDir);
}
@Override
- protected List<String> getNoDictionaryColumns() {
- return Collections.singletonList("logLine");
- }
-
- @Test
- public void testValues()
- throws Exception {
- Assert.assertEquals(getPinotConnection().execute(
- "SELECT count(*) FROM " + getTableName() + " WHERE
REGEXP_LIKE(logLine, '.*executor.*')").getResultSet(0)
- .getLong(0), 53);
+ public String getTimeColumnName() {
+ return "timestampInEpoch";
}
- @AfterClass
- public void tearDown()
- throws Exception {
- try {
- dropRealtimeTable(getTableName());
- stopServer();
- stopBroker();
- stopController();
- stopKafka();
- stopZk();
- } finally {
- FileUtils.deleteQuietly(_tempDir);
- }
+ @Override
+ protected long getCountStarResult() {
+ return 100;
}
+ @Override
protected int getRealtimeSegmentFlushSize() {
return 30;
}
+ @Nullable
@Override
- protected long getCountStarResult() {
- return 100;
+ protected List<String> getInvertedIndexColumns() {
+ return null;
}
+ @Nullable
@Override
- protected String getTableName() {
- return "clpEncodingIT";
+ protected List<String> getRangeIndexColumns() {
+ return null;
}
+ @Nullable
@Override
- protected String getAvroTarFileName() {
- return "clpEncodingITData.tar.gz";
+ protected List<String> getBloomFilterColumns() {
+ return null;
}
+ @Nullable
@Override
- protected String getSchemaFileName() {
- return "clpEncodingRealtimeIntegrationTestSchema.schema";
+ protected String getSortedColumn() {
+ return null;
}
@Override
- protected String getTimeColumnName() {
- return "timestampInEpoch";
+ protected List<String> getNoDictionaryColumns() {
+ return Collections.singletonList("logLine");
}
@Override
@@ -153,7 +111,6 @@ public class CLPEncodingRealtimeIntegrationTest extends
BaseClusterIntegrationTe
List<FieldConfig> fieldConfigs = new ArrayList<>();
fieldConfigs.add(new
FieldConfig.Builder("logLine").withEncodingType(FieldConfig.EncodingType.RAW)
.withCompressionCodec(_selectedCompressionCodec).build());
-
return fieldConfigs;
}
@@ -164,7 +121,14 @@ public class CLPEncodingRealtimeIntegrationTest extends
BaseClusterIntegrationTe
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transforms);
-
return ingestionConfig;
}
+
+ @Test
+ public void testValues()
+ throws Exception {
+ Assert.assertEquals(getPinotConnection().execute(
+ "SELECT count(*) FROM " + getTableName() + " WHERE
REGEXP_LIKE(logLine, '.*executor.*')").getResultSet(0)
+ .getLong(0), 53);
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
similarity index 91%
rename from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
rename to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
index 54efd3ccbd5..8d3c8d2672e 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByOptionsTest.java
@@ -16,13 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.File;
import java.io.IOException;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -30,23 +29,20 @@ import java.util.Properties;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static
org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl;
-public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet {
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class GroupByOptionsTest extends CustomDataQueryClusterIntegrationTest {
static final int FILES_NO = 4;
static final int RECORDS_NO = 20;
@@ -55,42 +51,32 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
static final String RESULT_TABLE = "resultTable";
static final int SERVERS_NO = 2;
- @BeforeClass
- public void setUp()
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
- startZk();
- startController();
- startServers(SERVERS_NO);
- startBroker();
+ @Override
+ public String getTableName() {
+ return "GroupByOptionsTest";
+ }
- Schema schema = new
Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
+ @Override
+ public Schema createSchema() {
+ return new Schema.SchemaBuilder().setSchemaName(getTableName())
.addSingleValueDimension(I_COL, FieldSpec.DataType.INT)
.addSingleValueDimension(J_COL, FieldSpec.DataType.LONG)
.build();
- addSchema(schema);
- TableConfig tableConfig = createOfflineTableConfig();
- addTableConfig(tableConfig);
-
- List<File> avroFiles = createAvroFile(_tempDir);
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0, _segmentDir, _tarDir);
- uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
-
- // Wait for all documents loaded
- TestUtils.waitForCondition(() ->
getCurrentCountStarResult(DEFAULT_TABLE_NAME) == FILES_NO * RECORDS_NO, 100L,
- 60_000,
- "Failed to load documents", true, Duration.ofMillis(60_000 / 10));
-
- setUseMultiStageQueryEngine(true);
+ }
- Map<String, List<String>> map =
getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE);
+ @Override
+ public List<File> createAvroFiles()
+ throws Exception {
+ return createAvroFile(_tempDir);
+ }
- // make sure segments are split between multiple servers
- Assert.assertEquals(map.size(), SERVERS_NO);
+ @Override
+ protected long getCountStarResult() {
+ return FILES_NO * RECORDS_NO;
}
- protected TableConfig createOfflineTableConfig() {
+ @Override
+ public TableConfig createOfflineTableConfig() {
return new TableConfigBuilder(TableType.OFFLINE)
.setTableName(getTableName())
.setNumReplicas(getNumReplicas())
@@ -130,6 +116,12 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void
testOrderByKeysIsNotPushedToFinalAggregationWhenGroupTrimHintIsDisabled()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
+ Map<String, List<String>> map =
getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE);
+ // make sure segments are split between multiple servers
+ Assert.assertEquals(map.size(), SERVERS_NO);
+
String trimDisabledPlan = "Execution Plan\n"
+ "LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC],
offset=[0], fetch=[1])\n"
+ " PinotLogicalSortExchange(distribution=[hash], collation=[[0, 1
DESC]], isSortOnSender=[false], "
@@ -137,7 +129,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
+ " LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC],
fetch=[1])\n"
+ " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName() +
"])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
@@ -171,6 +163,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void
testOrderByKeysIsPushedToFinalAggregationStageWithoutGroupTrimSize()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
// is_enable_group_trim enables V1-style trimming in leaf nodes,
// with numGroupsLimit and minSegmentGroupTrimSize,
// while group_trim_size - in final aggregation node
@@ -197,7 +191,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
+ " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[0, 1 "
+ "DESC]], limit=[1])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName() +
"])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
@@ -209,6 +203,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testOrderByKeysIsPushedToFinalAggregationStageWithGroupTrimSize()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
// is_enable_group_trim enables V1-style trimming in leaf nodes, with
numGroupsLimit and minSegmentGroupTrimSize,
// while group_trim_size - in final aggregation node .
// Same as above, to stabilize result here, we override global
numGroupsLimit option with num_groups_limit hint.
@@ -231,7 +227,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
+ " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[0, 1 "
+ "DESC]], limit=[1])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName() +
"])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
@@ -243,6 +239,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testOrderByKeysIsPushedToFinalAggregationStage()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
assertResultAndPlan(
// group_trim_size should sort and limit v2 aggregate output if order
by and limit is propagated
" ",
@@ -263,7 +261,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
+ " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[0, "
+ "1]], limit=[3])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName() +
"])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
@@ -275,6 +273,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testHavingOnKeysAndOrderByKeysIsPushedToFinalAggregationStage()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
assertResultAndPlan(
// group_trim_size should sort and limit v2 aggregate output if order
by and limit is propagated
" ",
@@ -296,7 +296,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
+ " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[0, "
+ "1]], limit=[3])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName() +
"])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
@@ -308,6 +308,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testGroupByKeysWithOffsetIsPushedToFinalAggregationStage()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
// if offset is set, leaf should return more results to intermediate stage
assertResultAndPlan(
"",
@@ -329,7 +331,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
+ " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[0, "
+ "1]], limit=[4])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName() +
"])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
@@ -342,6 +344,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testOrderByByKeysAndValuesIsPushedToFinalAggregationStage()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
// group_trim_size should sort and limit v2 aggregate output if order by
and limit is propagated
assertResultAndPlan(
" ",
@@ -364,7 +368,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
+ " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[0 "
+ "DESC, 1 DESC, 2 DESC]], limit=[3])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName() +
"])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
@@ -377,6 +381,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testOrderByKeyValueExpressionIsNotPushedToFinalAggregateStage()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
// Order by both expression based on keys and aggregate values.
// Expression & limit are not available until after aggregation so they
can't be pushed down.
// Because of that, group_trim_size is not applied.
@@ -402,7 +408,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
+ " LogicalProject(i=[$0], j=[$1], cnt=[$2], EXPR$3=[*(*($0,
$1), $2)])\n"
+ " PinotLogicalAggregate(group=[{0, 1}],
agg#0=[COUNT($2)], aggType=[FINAL])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName()
+ "])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
@@ -415,6 +421,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testForGroupByOverJoinOrderByKeyIsPushedToAggregationLeafStage()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
// query uses V2 aggregate operator for both leaf and final stages because
of join
assertResultAndPlan(
" ",
@@ -442,18 +450,18 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
+ "1]], limit=[5])\n"
+ " LogicalJoin(condition=[true], joinType=[inner])\n"
+ " PinotLogicalExchange(distribution=[random])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" +
getTableName() + "])\n"
+ " StreamingInstanceResponse\n"
+ " StreamingCombineSelect\n"
- + " SelectStreaming(table=[mytable],
totalDocs=[80])\n"
+ + " SelectStreaming(table=[" + getTableName()
+ "], totalDocs=[80])\n"
+ " Project(columns=[[i, j]])\n"
+ " DocIdSet(maxDocs=[40000])\n"
+ "
FilterMatchEntireSegment(numDocs=[80])\n"
+ " PinotLogicalExchange(distribution=[broadcast])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" +
getTableName() + "])\n"
+ " StreamingInstanceResponse\n"
+ " StreamingCombineSelect\n"
- + " SelectStreaming(table=[mytable],
totalDocs=[80])\n"
+ + " SelectStreaming(table=[" + getTableName()
+ "], totalDocs=[80])\n"
+ " Transform(expressions=[['0']])\n"
+ " Project(columns=[[]])\n"
+ " DocIdSet(maxDocs=[40000])\n"
@@ -461,7 +469,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
);
}
- public void assertResultAndPlan(String option, String query, String
expectedResult, String expectedPlan)
+ private void assertResultAndPlan(String option, String query, String
expectedResult, String expectedPlan)
throws Exception {
String sql = option
//disable timeout in debug
@@ -478,6 +486,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void
testExceptionIsThrownWhenErrorOnNumGroupsLimitHintIsSetAndLimitIsReachedV1()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
String query = " select /*+
aggOptions(num_groups_limit='1',error_on_num_groups_limit='true') */"
+ " i, j, count(*) as cnt "
+ " from " + getTableName()
@@ -490,6 +500,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void
testExceptionIsThrownWhenErrorOnNumGroupsLimitHintIsSetAndLimitIsReachedV2()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
String query = " set numGroupsLimit=1;"
+ " select /*+ aggOptions(error_on_num_groups_limit='true') */"
+ " i, j, count(*) as cnt "
@@ -503,6 +515,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void
testExceptionIsThrownWhenErrorOnNumGroupsLimitOptionIsSetAndLimitIsReachedV1()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
String query = " set errorOnNumGroupsLimit=true; set numGroupsLimit=1;"
+ " select i, j, count(*) as cnt "
+ " from " + getTableName()
@@ -515,6 +529,8 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void
testExceptionIsThrownWhenErrorOnNumGroupsLimitOptionIsSetAndLimitIsReachedV2()
throws Exception {
+ setUseMultiStageQueryEngine(true);
+
String query = " set errorOnNumGroupsLimit=true; "
+ "select /*+ aggOptions(num_groups_limit='1') */ i, j, count(*) as
cnt "
+ " from " + getTableName()
@@ -571,7 +587,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
return toString(node);
}
- static String toExplainStr(JsonNode mainNode, boolean isMSQE) {
+ public static String toExplainStr(JsonNode mainNode, boolean isMSQE) {
if (mainNode == null) {
return "null";
}
@@ -582,7 +598,7 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
return toExplainString(node, isMSQE);
}
- static String toExplainStr(JsonNode mainNode) {
+ public static String toExplainStr(JsonNode mainNode) {
return toExplainStr(mainNode, false);
}
@@ -643,17 +659,4 @@ public class GroupByOptionsIntegrationTest extends
BaseClusterIntegrationTestSet
return result.toString();
}
}
-
- @AfterClass
- public void tearDown()
- throws Exception {
- dropOfflineTable(DEFAULT_TABLE_NAME);
-
- stopServer();
- stopBroker();
- stopController();
- stopZk();
-
- FileUtils.deleteDirectory(_tempDir);
- }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByTrimmingTest.java
similarity index 86%
rename from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
rename to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByTrimmingTest.java
index 689b7de0a76..00993a75a6d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByTrimmingIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/GroupByTrimmingTest.java
@@ -16,18 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
import java.io.File;
import java.io.IOException;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.Connection;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -35,13 +33,10 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static
org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toExplainStr;
-import static
org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toResultStr;
+import static
org.apache.pinot.integration.tests.custom.GroupByOptionsTest.toExplainStr;
+import static
org.apache.pinot.integration.tests.custom.GroupByOptionsTest.toResultStr;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -52,7 +47,8 @@ import static org.testng.Assert.assertTrue;
// MSQE - segment, inter-segment and intermediate levels
// Note: MSQE doesn't push collations depending on group by result into
aggregation nodes
// so e.g. ORDER BY i*j doesn't trigger trimming even when hints are set
-public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSet {
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class GroupByTrimmingTest extends CustomDataQueryClusterIntegrationTest
{
static final int FILES_NO = 4;
static final int RECORDS_NO = 1000;
@@ -60,42 +56,32 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
static final String J_COL = "j";
static final int SERVERS_NO = 2;
- @BeforeClass
- public void setUp()
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
- startZk();
- startController();
- startServers(SERVERS_NO);
- startBroker();
+ @Override
+ public String getTableName() {
+ return "GroupByTrimmingTest";
+ }
- Schema schema = new
Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
+ @Override
+ public Schema createSchema() {
+ return new Schema.SchemaBuilder().setSchemaName(getTableName())
.addSingleValueDimension(I_COL, FieldSpec.DataType.INT)
.addSingleValueDimension(J_COL, FieldSpec.DataType.LONG)
.build();
- addSchema(schema);
- TableConfig tableConfig = createOfflineTableConfig();
- addTableConfig(tableConfig);
-
- List<File> avroFiles = createAvroFile(_tempDir);
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0, _segmentDir, _tarDir);
- uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
-
- // Wait for all documents loaded
- TestUtils.waitForCondition(() ->
getCurrentCountStarResult(DEFAULT_TABLE_NAME) == FILES_NO * RECORDS_NO, 100L,
- 60_000,
- "Failed to load documents", true, Duration.ofMillis(60_000 / 10));
-
- setUseMultiStageQueryEngine(true);
+ }
- Map<String, List<String>> map =
getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE);
+ @Override
+ public List<File> createAvroFiles()
+ throws Exception {
+ return createAvroFile(_tempDir);
+ }
- // make sure segments are split between multiple servers
- assertEquals(map.size(), SERVERS_NO);
+ @Override
+ protected long getCountStarResult() {
+ return FILES_NO * RECORDS_NO;
}
- protected TableConfig createOfflineTableConfig() {
+ @Override
+ public TableConfig createOfflineTableConfig() {
return new TableConfigBuilder(TableType.OFFLINE)
.setTableName(getTableName())
.setNumReplicas(getNumReplicas())
@@ -138,12 +124,17 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
throws Exception {
setUseMultiStageQueryEngine(true);
+ Map<String, List<String>> map =
getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE);
+ // make sure segments are split between multiple servers
+ assertEquals(map.size(), SERVERS_NO);
+
Connection conn = getPinotConnection();
- assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable
GROUP BY i, j ORDER BY i*j DESC LIMIT 5"));
+ assertTrimFlagNotSet(conn.execute(
+ "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER
BY i*j DESC LIMIT 5"));
String options = "SET minSegmentGroupTrimSize=5; ";
String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i,
j, COUNT(*) "
- + "FROM mytable GROUP BY i, j ORDER BY i*j DESC LIMIT 5 ";
+ + "FROM " + getTableName() + " GROUP BY i, j ORDER BY i*j DESC LIMIT 5
";
ResultSetGroup result = conn.execute(options + query);
assertTrimFlagNotSet(result);
@@ -158,7 +149,7 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
// <-- order by value is computed here, so trimming in upstream
stages is not possible
+ " PinotLogicalAggregate(group=[{0, 1}],
agg#0=[COUNT($2)], aggType=[FINAL])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName()
+ "])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
@@ -173,11 +164,12 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
setUseMultiStageQueryEngine(true);
Connection conn = getPinotConnection();
- assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+ assertTrimFlagNotSet(conn.execute(
+ "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER
BY j DESC LIMIT 5"));
String options = "SET minSegmentGroupTrimSize=5; ";
String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i,
j, COUNT(*) "
- + "FROM mytable GROUP BY i, j ORDER BY j DESC LIMIT 5 ";
+ + "FROM " + getTableName() + " GROUP BY i, j ORDER BY j DESC LIMIT 5 ";
ResultSetGroup result = conn.execute(options + query);
assertTrimFlagSet(result);
@@ -199,7 +191,7 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
+ " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[1 DESC]],"
+ " limit=[5])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName() +
"])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n" // <-- trimming happens here
@@ -215,11 +207,12 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
Connection conn = getPinotConnection();
assertTrimFlagNotSet(
- conn.execute("SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER
BY COUNT(*) DESC LIMIT 5"));
+ conn.execute(
+ "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j
ORDER BY COUNT(*) DESC LIMIT 5"));
String options = "SET minSegmentGroupTrimSize=5; ";
String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i,
j, COUNT(*) "
- + "FROM mytable GROUP BY i, j ORDER BY count(*) DESC LIMIT 5 ";
+ + "FROM " + getTableName() + " GROUP BY i, j ORDER BY count(*) DESC
LIMIT 5 ";
ResultSetGroup result = conn.execute(options + query);
assertTrimFlagSet(result);
@@ -244,7 +237,7 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
+ " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[2 DESC]],"
+ " limit=[5])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName() +
"])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n" //<-- trimming happens here
@@ -259,11 +252,12 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
setUseMultiStageQueryEngine(true);
Connection conn = getPinotConnection();
- assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+ assertTrimFlagNotSet(conn.execute(
+ "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER
BY j DESC LIMIT 5"));
String options = "SET minServerGroupTrimSize = 5; SET groupTrimThreshold =
100; ";
String query = "SELECT /*+ aggOptions(is_enable_group_trim='true') */ i,
j, COUNT(*) "
- + "FROM mytable "
+ + "FROM " + getTableName() + " "
+ "GROUP BY i, j "
+ "ORDER BY j DESC "
+ "LIMIT 5 ";
@@ -287,7 +281,7 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
+ " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[1 DESC]],"
+ " limit=[5])\n"
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n"
- + " LeafStageCombineOperator(table=[mytable])\n"
+ + " LeafStageCombineOperator(table=[" + getTableName() +
"])\n"
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n" // <-- trimming happens here
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
@@ -302,12 +296,13 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
setUseMultiStageQueryEngine(true);
Connection conn = getPinotConnection();
- assertTrimFlagNotSet(conn.execute("SELECT i, j, COUNT(*) FROM mytable
GROUP BY i, j ORDER BY j DESC LIMIT 5"));
+ assertTrimFlagNotSet(conn.execute(
+ "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER
BY j DESC LIMIT 5"));
// This case is tricky because intermediate results are hash-split among
servers so one gets 50 rows on average.
// That's the reason both limit and trim size needs to be so small.
String query = "SELECT /*+
aggOptions(is_enable_group_trim='true',mse_min_group_trim_size='5') */ i, j,
COUNT(*) "
- + "FROM mytable "
+ + "FROM " + getTableName() + " "
+ "GROUP BY i, j "
+ "ORDER BY j DESC "
+ "LIMIT 5 ";
@@ -331,7 +326,7 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
+ " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)],
aggType=[FINAL], collations=[[1 DESC]],"
+ " limit=[5])\n" // receives 50-row-big blocks, trimming kicks in
only if limit is lower
+ " PinotLogicalExchange(distribution=[hash[0, 1]])\n" //
splits blocks via hash distribution
- + " LeafStageCombineOperator(table=[mytable])\n" // no
trimming happens 'below'
+ + " LeafStageCombineOperator(table=[" + getTableName() +
"])\n" // no trimming happens 'below'
+ " StreamingInstanceResponse\n"
+ " CombineGroupBy\n"
+ " GroupBy(groupKeys=[[i, j]],
aggregations=[[count(*)]])\n"
@@ -345,7 +340,9 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
public void
testSSQEFilteredGroupsTrimmedAtSegmentLevelWithOrderGroupByKeysDerivedFunctionIsNotSafe()
throws Exception {
setUseMultiStageQueryEngine(false);
- String query = "SELECT i, j, SUM(i) FILTER (WHERE i > 0) FROM mytable
GROUP BY i, j ORDER BY i + j DESC LIMIT 5";
+ String query =
+ "SELECT i, j, SUM(i) FILTER (WHERE i > 0) FROM " + getTableName()
+ + " GROUP BY i, j ORDER BY i + j DESC LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -378,7 +375,8 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
public void
testSSQEGroupsTrimmedAtSegmentLevelWithOrderGroupByKeysDerivedFunctionIsNotSafe()
throws Exception {
setUseMultiStageQueryEngine(false);
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
i + j DESC LIMIT 5";
+ String query =
+ "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER
BY i + j DESC LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -408,7 +406,7 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
public void
testSSQEGroupsTrimmedAtSegmentLevelWithOrderBySomeGroupByKeysIsNotSafe()
throws Exception {
setUseMultiStageQueryEngine(false);
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
j DESC LIMIT 5";
+ String query = "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY
i, j ORDER BY j DESC LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -445,7 +443,8 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
// trimming is safe on rows ordered by all group by keys (regardless of
key order, direction or duplications)
// but not when HAVING clause is present
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i
> 50 ORDER BY i ASC, j ASC";
+ String query =
+ "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j
HAVING i > 50 ORDER BY i ASC, j ASC";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -473,7 +472,8 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
setUseMultiStageQueryEngine(false);
// trimming is safe on rows ordered by all group by keys (regardless of
key order, direction or duplications)
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
j ASC, i DESC, j ASC LIMIT 5";
+ String query =
+ "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER
BY j ASC, i DESC, j ASC LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -505,7 +505,9 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
setUseMultiStageQueryEngine(false);
// trimming is safe on rows ordered by all group by keys (regardless of
key order, direction or duplications)
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, i, j ORDER
BY j ASC, i DESC, j ASC LIMIT 5";
+ String query =
+ "SELECT i, j, COUNT(*) FROM " + getTableName()
+ + " GROUP BY i, i, j ORDER BY j ASC, i DESC, j ASC LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -538,7 +540,8 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
setUseMultiStageQueryEngine(false);
// trimming is safe on rows ordered by all group by keys (regardless of
key order or direction)
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
count(*)*j ASC LIMIT 5";
+ String query =
+ "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER
BY count(*)*j ASC LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -570,7 +573,7 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
public void
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnSomeGroupByKeysIsNotSafe()
throws Exception {
setUseMultiStageQueryEngine(false);
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
i DESC LIMIT 5";
+ String query = "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY
i, j ORDER BY i DESC LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -595,7 +598,7 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
throws Exception {
// for SSQE server level == inter-segment level
setUseMultiStageQueryEngine(false);
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
i, j LIMIT 5";
+ String query = "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY
i, j ORDER BY i, j LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -627,7 +630,8 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
throws Exception {
// for SSQE server level == inter-segment level
setUseMultiStageQueryEngine(false);
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
count(*)*j DESC LIMIT 5";
+ String query =
+ "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER
BY count(*)*j DESC LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -636,15 +640,6 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
ResultSetGroup result = conn.execute("SET minServerGroupTrimSize = 5; SET
groupTrimThreshold = 100; " + query);
assertTrimFlagSet(result);
- // Result, though unstable due to concurrent operations on IndexedTable,
is similar to the following
- // (which is not correct):
- //i[INT],j[LONG],count(*)[LONG]
- //98,\t998,\t4
- //94,\t994,\t4
- //90,\t990,\t4
- //86,\t986,\t4
- //79,\t979,\t4
-
assertEquals(toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false),
"BROKER_REDUCE(sort:[times(count(*),j)
DESC],limit:5,postAggregations:times(count(*),j)),\t1,\t0\n"
+ "COMBINE_GROUP_BY,\t2,\t1\n" //<-- trimming happens here
@@ -659,7 +654,9 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
public void
testSSQEGroupsTrimmedAtInterSegmentLevelWithOrderByOnAllGroupByKeysAndHavingIsNotSafe()
throws Exception {
setUseMultiStageQueryEngine(false);
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i
> 50 ORDER BY i ASC, j ASC LIMIT 5";
+ String query =
+ "SELECT i, j, COUNT(*) FROM " + getTableName()
+ + " GROUP BY i, j HAVING i > 50 ORDER BY i ASC, j ASC LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -688,7 +685,7 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
public void testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByKeysIsSafe()
throws Exception {
setUseMultiStageQueryEngine(false);
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
i, j LIMIT 5";
+ String query = "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY
i, j ORDER BY i, j LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -719,7 +716,7 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
public void
testSSQEGroupsTrimmedAtBrokerLevelOrderedBySomeGroupByKeysIsNotSafe()
throws Exception {
setUseMultiStageQueryEngine(false);
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
j DESC LIMIT 5";
+ String query = "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY
i, j ORDER BY j DESC LIMIT 5";
Connection conn = getPinotConnection();
ResultSetGroup result1 = conn.execute(query);
@@ -751,7 +748,9 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
public void
testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByKeysAndHavingIsNotSafe()
throws Exception {
setUseMultiStageQueryEngine(false);
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j HAVING i
> 50 ORDER BY i ASC, j ASC LIMIT 5";
+ String query =
+ "SELECT i, j, COUNT(*) FROM " + getTableName()
+ + " GROUP BY i, j HAVING i > 50 ORDER BY i ASC, j ASC LIMIT 5";
Connection conn = getPinotConnection();
ResultSetGroup result1 = conn.execute(query);
@@ -779,7 +778,8 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
public void
testSSQEGroupsTrimmedAtBrokerLevelOrderedByAllGroupByAggregateIsNotSafe()
throws Exception {
setUseMultiStageQueryEngine(false);
- String query = "SELECT i, j, COUNT(*) FROM mytable GROUP BY i, j ORDER BY
count(*)*j DESC LIMIT 5";
+ String query =
+ "SELECT i, j, COUNT(*) FROM " + getTableName() + " GROUP BY i, j ORDER
BY count(*)*j DESC LIMIT 5";
Connection conn = getPinotConnection();
assertTrimFlagNotSet(conn.execute(query));
@@ -788,14 +788,6 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
ResultSetGroup result = conn.execute("SET minBrokerGroupTrimSize = 5; SET
groupTrimThreshold = 50; " + query);
assertTrimFlagSet(result);
- // result is similar to the following, but unstable:
- //i[INT],j[LONG],count(*)[LONG]
- //99,999,4
- //98,998,4
- //97,997,4
- //96,996,4
- //82,982,4
-
assertEquals(toExplainStr(postQuery("EXPLAIN PLAN FOR " + query), false),
"BROKER_REDUCE(sort:[times(count(*),j) DESC],limit:5," //<-- trimming
happens here
+ "postAggregations:times(count(*),j)),\t1,\t0\n"
@@ -814,17 +806,4 @@ public class GroupByTrimmingIntegrationTest extends
BaseClusterIntegrationTestSe
private static void assertTrimFlagSet(ResultSetGroup result) {
assertTrue(result.getBrokerResponse().getExecutionStats().isGroupsTrimmed());
}
-
- @AfterClass
- public void tearDown()
- throws Exception {
- dropOfflineTable(DEFAULT_TABLE_NAME);
-
- stopServer();
- stopBroker();
- stopController();
- stopZk();
-
- FileUtils.deleteDirectory(_tempDir);
- }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
index ed1f439401d..d8672c44146 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MultiColumnTextIndicesTest.java
@@ -52,7 +52,7 @@ import org.testng.annotations.Test;
import static org.apache.avro.Schema.create;
import static org.apache.avro.Schema.createArray;
import static org.apache.avro.Schema.createUnion;
-import static
org.apache.pinot.integration.tests.GroupByOptionsIntegrationTest.toResultStr;
+import static
org.apache.pinot.integration.tests.custom.GroupByOptionsTest.toResultStr;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RowExpressionIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/RowExpressionTest.java
similarity index 72%
rename from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RowExpressionIntegrationTest.java
rename to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/RowExpressionTest.java
index 47c3285c0bb..29972fb64fe 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RowExpressionIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/RowExpressionTest.java
@@ -16,97 +16,57 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
+import java.io.IOException;
import java.util.List;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet {
- private static final String SCHEMA_FILE_NAME =
"On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
- @Override
- protected String getSchemaFileName() {
- return SCHEMA_FILE_NAME;
- }
-
- @BeforeClass
- public void setUp()
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
- // Start the Pinot cluster
- startZk();
- startController();
- startBroker();
- startServer();
- setupTenants();
-
- // Create and upload the schema and table config
- Schema schema = createSchema();
- addSchema(schema);
- TableConfig tableConfig = createOfflineTableConfig();
- addTableConfig(tableConfig);
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class RowExpressionTest extends CustomDataQueryClusterIntegrationTest {
+ private static final String TABLE_NAME = "RowExpressionTest";
+ private static final String SCHEMA_FILE_NAME =
+ "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
- // Unpack the Avro files
- List<File> avroFiles = unpackAvroData(_tempDir);
-
- // Create and upload segments
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0, _segmentDir, _tarDir);
- uploadSegments(getTableName(), _tarDir);
-
- // Set up the H2 connection
- setUpH2Connection(avroFiles);
-
- // Initialize the query generator
- setUpQueryGenerator(avroFiles);
-
- // Wait for all documents loaded
- waitForAllDocsLoaded(600_000L);
-
- // Use multi-stage query engine for all tests
- setUseMultiStageQueryEngine(true);
+ @Override
+ public String getTableName() {
+ return TABLE_NAME;
}
- @BeforeMethod
@Override
- public void resetMultiStage() {
- setUseMultiStageQueryEngine(true);
+ public Schema createSchema() {
+ try {
+ Schema schema = createSchema(SCHEMA_FILE_NAME);
+ schema.setSchemaName(getTableName());
+ return schema;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to load schema: " + SCHEMA_FILE_NAME,
e);
+ }
}
- protected void setupTenants()
+ @Override
+ public List<File> createAvroFiles()
throws Exception {
- // Use default tenant setup
+ return unpackAvroData(_tempDir);
}
- @AfterClass
- public void tearDown()
- throws Exception {
- dropOfflineTable(DEFAULT_TABLE_NAME);
-
- stopServer();
- stopBroker();
- stopController();
- stopZk();
-
- FileUtils.deleteDirectory(_tempDir);
+ @BeforeMethod
+ public void resetMultiStage() {
+ setUseMultiStageQueryEngine(true);
}
@Test
public void testRowEqualityTwoFields()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, ArrDelay) =
(201, 10)";
+ String query = "SELECT COUNT(*) FROM " + getTableName() + " WHERE
(AirTime, ArrDelay) = (201, 10)";
JsonNode result = postQuery(query);
assertNoError(result);
assertTrue(result.get("numRowsResultSet").asInt() >= 0);
@@ -115,7 +75,8 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowEqualityThreeFields()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, ArrDelay,
DepDelay) = (201, 10, 5)";
+ String query =
+ "SELECT COUNT(*) FROM " + getTableName() + " WHERE (AirTime, ArrDelay,
DepDelay) = (201, 10, 5)";
JsonNode result = postQuery(query);
assertNoError(result);
assertTrue(result.get("numRowsResultSet").asInt() >= 0);
@@ -124,7 +85,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowNotEquals()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, ArrDelay) <>
(0, 0)";
+ String query = "SELECT COUNT(*) FROM " + getTableName() + " WHERE
(AirTime, ArrDelay) <> (0, 0)";
JsonNode result = postQuery(query);
assertNoError(result);
assertTrue(result.get("numRowsResultSet").asInt() > 0);
@@ -133,7 +94,8 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowGreaterThan()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime,
ActualElapsedTime) > (200, 230)";
+ String query =
+ "SELECT COUNT(*) FROM " + getTableName() + " WHERE (AirTime,
ActualElapsedTime) > (200, 230)";
JsonNode result = postQuery(query);
assertNoError(result);
long count = result.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -143,7 +105,8 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowGreaterThanOrEqual()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime,
ActualElapsedTime) >= (200, 230)";
+ String query =
+ "SELECT COUNT(*) FROM " + getTableName() + " WHERE (AirTime,
ActualElapsedTime) >= (200, 230)";
JsonNode result = postQuery(query);
assertNoError(result);
long count = result.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -153,7 +116,8 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowLessThan()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime,
ActualElapsedTime) < (100, 120)";
+ String query =
+ "SELECT COUNT(*) FROM " + getTableName() + " WHERE (AirTime,
ActualElapsedTime) < (100, 120)";
JsonNode result = postQuery(query);
assertNoError(result);
long count = result.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -163,7 +127,8 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowLessThanOrEqual()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime,
ActualElapsedTime) <= (100, 120)";
+ String query =
+ "SELECT COUNT(*) FROM " + getTableName() + " WHERE (AirTime,
ActualElapsedTime) <= (100, 120)";
JsonNode result = postQuery(query);
assertNoError(result);
long count = result.get("resultTable").get("rows").get(0).get(0).asLong();
@@ -173,8 +138,8 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowWithFourFields()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable "
- + "WHERE (AirTime, ArrDelay, DepDelay, Distance) > (200, 0, 0, 1000)";
+ String query = "SELECT COUNT(*) FROM " + getTableName()
+ + " WHERE (AirTime, ArrDelay, DepDelay, Distance) > (200, 0, 0, 1000)";
JsonNode result = postQuery(query);
assertNoError(result);
assertTrue(result.get("numRowsResultSet").asInt() >= 0);
@@ -183,7 +148,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowWithMixedDataTypes()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE (AirlineID, Carrier) >
(20000, 'AA')";
+ String query = "SELECT COUNT(*) FROM " + getTableName() + " WHERE
(AirlineID, Carrier) > (20000, 'AA')";
JsonNode result = postQuery(query);
assertNoError(result);
assertTrue(result.get("numRowsResultSet").asInt() >= 0);
@@ -193,7 +158,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
public void testKeysetPaginationUseCase()
throws Exception {
String query1 = "SELECT AirlineID, Carrier, AirTime "
- + "FROM mytable "
+ + "FROM " + getTableName() + " "
+ "ORDER BY AirlineID, Carrier, AirTime "
+ "LIMIT 10";
JsonNode result1 = postQuery(query1);
@@ -207,7 +172,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
String query2 = String.format(
"SELECT AirlineID, Carrier, AirTime "
- + "FROM mytable "
+ + "FROM " + getTableName() + " "
+ "WHERE (AirlineID, Carrier, AirTime) > (%d, '%s', %d) "
+ "ORDER BY AirlineID, Carrier, AirTime "
+ "LIMIT 10",
@@ -225,7 +190,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
public void testRowInComplexQuery()
throws Exception {
String query = "SELECT COUNT(*) FROM ("
- + " SELECT AirlineID, Carrier FROM mytable "
+ + " SELECT AirlineID, Carrier FROM " + getTableName() + " "
+ " WHERE (AirlineID, Carrier) > (20000, 'AA') "
+ " ORDER BY AirlineID, Carrier LIMIT 100"
+ ") AS t";
@@ -238,7 +203,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
public void testRowWithCTE()
throws Exception {
String query = "WITH filtered AS ("
- + " SELECT AirlineID, Carrier, AirTime FROM mytable "
+ + " SELECT AirlineID, Carrier, AirTime FROM " + getTableName() + " "
+ " WHERE AirlineID > 19000"
+ ") "
+ "SELECT COUNT(*) FROM filtered "
@@ -251,7 +216,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testMultipleRowComparisons()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable "
+ String query = "SELECT COUNT(*) FROM " + getTableName() + " "
+ "WHERE (AirTime, ActualElapsedTime) > (100, 120) "
+ "AND (AirTime, ActualElapsedTime) < (500, 600)";
JsonNode result = postQuery(query);
@@ -262,7 +227,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowComparisonWithLiterals()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable "
+ String query = "SELECT COUNT(*) FROM " + getTableName() + " "
+ "WHERE (201, 230) < (AirTime, ActualElapsedTime)";
JsonNode result = postQuery(query);
assertNoError(result);
@@ -273,7 +238,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
public void testExplainPlanWithRowExpression()
throws Exception {
String query = "EXPLAIN PLAN FOR "
- + "SELECT * FROM mytable WHERE (AirTime, ActualElapsedTime) > (200,
230) LIMIT 10";
+ + "SELECT * FROM " + getTableName() + " WHERE (AirTime,
ActualElapsedTime) > (200, 230) LIMIT 10";
JsonNode result = postQuery(query);
assertNoError(result);
String plan = result.get("resultTable").get("rows").get(0).get(1).asText();
@@ -283,7 +248,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowInSelectList()
throws Exception {
- String query = "SELECT (AirTime, ActualElapsedTime) FROM mytable LIMIT 10";
+ String query = "SELECT (AirTime, ActualElapsedTime) FROM " +
getTableName() + " LIMIT 10";
JsonNode result = postQuery(query);
assertTrue(result.get("exceptions").size() > 0, "Expected validation
error");
String errorMessage =
result.get("exceptions").get(0).get("message").asText();
@@ -294,7 +259,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowInGroupBy()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable GROUP BY (AirlineID,
Carrier)";
+ String query = "SELECT COUNT(*) FROM " + getTableName() + " GROUP BY
(AirlineID, Carrier)";
JsonNode result = postQuery(query);
assertTrue(result.get("exceptions").size() > 0, "Expected validation
error");
String errorMessage =
result.get("exceptions").get(0).get("message").asText();
@@ -306,7 +271,8 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowInOrderBy()
throws Exception {
- String query = "SELECT AirlineID, Carrier FROM mytable ORDER BY
(AirlineID, Carrier) LIMIT 10";
+ String query = "SELECT AirlineID, Carrier FROM " + getTableName()
+ + " ORDER BY (AirlineID, Carrier) LIMIT 10";
JsonNode result = postQuery(query);
assertTrue(result.get("exceptions").size() > 0, "Expected validation
error");
String errorMessage =
result.get("exceptions").get(0).get("message").asText();
@@ -318,7 +284,8 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testSingleSidedRowComparison()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime,
ActualElapsedTime) > 200";
+ String query = "SELECT COUNT(*) FROM " + getTableName()
+ + " WHERE (AirTime, ActualElapsedTime) > 200";
JsonNode result = postQuery(query);
assertTrue(result.get("exceptions").size() > 0, "Expected validation
error");
String errorMessage =
result.get("exceptions").get(0).get("message").asText();
@@ -329,7 +296,8 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testMismatchedRowSizes()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime,
ActualElapsedTime) > (200, 230, 250)";
+ String query = "SELECT COUNT(*) FROM " + getTableName()
+ + " WHERE (AirTime, ActualElapsedTime) > (200, 230, 250)";
JsonNode result = postQuery(query);
assertTrue(result.get("exceptions").size() > 0, "Expected validation
error");
String errorMessage =
result.get("exceptions").get(0).get("message").asText();
@@ -340,7 +308,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testEmptyRowExpression()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE () > ()";
+ String query = "SELECT COUNT(*) FROM " + getTableName() + " WHERE () > ()";
JsonNode result = postQuery(query);
assertTrue(result.get("exceptions").size() > 0, "Expected validation
error");
}
@@ -348,7 +316,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowInFunctionCall()
throws Exception {
- String query = "SELECT SUM((AirTime, ActualElapsedTime)) FROM mytable";
+ String query = "SELECT SUM((AirTime, ActualElapsedTime)) FROM " +
getTableName();
JsonNode result = postQuery(query);
assertTrue(result.get("exceptions").size() > 0, "Expected validation
error");
String errorMessage =
result.get("exceptions").get(0).get("message").asText();
@@ -359,7 +327,8 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowWithNullComparison()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable WHERE (AirTime, ArrDelay) >
(200, NULL)";
+ String query = "SELECT COUNT(*) FROM " + getTableName()
+ + " WHERE (AirTime, ArrDelay) > (200, NULL)";
JsonNode result = postQuery(query);
assertNoError(result);
}
@@ -367,7 +336,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowComparisonSameValues()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable "
+ String query = "SELECT COUNT(*) FROM " + getTableName() + " "
+ "WHERE (AirTime, ActualElapsedTime) >= (AirTime, ActualElapsedTime)";
JsonNode result = postQuery(query);
assertNoError(result);
@@ -381,7 +350,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
throws Exception {
// Test ROW in subquery WHERE clause
String query = "SELECT COUNT(*) FROM ("
- + " SELECT * FROM mytable WHERE (AirTime, ActualElapsedTime) > (200,
230)"
+ + " SELECT * FROM " + getTableName() + " WHERE (AirTime,
ActualElapsedTime) > (200, 230)"
+ ") AS subq";
JsonNode result = postQuery(query);
assertNoError(result);
@@ -391,7 +360,7 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowComparisonWithCalculatedFields()
throws Exception {
- String query = "SELECT COUNT(*) FROM mytable "
+ String query = "SELECT COUNT(*) FROM " + getTableName() + " "
+ "WHERE (AirTime * 2, ActualElapsedTime + 10) > (400, 240)";
JsonNode result = postQuery(query);
assertNoError(result);
@@ -401,12 +370,13 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowGreaterThanOrEqualVsExpanded()
throws Exception {
- String rowQuery = "SELECT COUNT(*) FROM mytable WHERE (AirTime,
ActualElapsedTime) >= (200, 230)";
+ String rowQuery = "SELECT COUNT(*) FROM " + getTableName()
+ + " WHERE (AirTime, ActualElapsedTime) >= (200, 230)";
JsonNode rowResult = postQuery(rowQuery);
assertNoError(rowResult);
long rowCount =
rowResult.get("resultTable").get("rows").get(0).get(0).asLong();
- String expandedQuery = "SELECT COUNT(*) FROM mytable "
+ String expandedQuery = "SELECT COUNT(*) FROM " + getTableName() + " "
+ "WHERE (AirTime > 200) OR ((AirTime = 200) AND (ActualElapsedTime >
230)) "
+ "OR ((AirTime = 200) AND (ActualElapsedTime = 230))";
JsonNode expandedResult = postQuery(expandedQuery);
@@ -419,12 +389,13 @@ public class RowExpressionIntegrationTest extends
BaseClusterIntegrationTestSet
@Test
public void testRowEqualityVsExpanded()
throws Exception {
- String rowQuery = "SELECT COUNT(*) FROM mytable WHERE (AirTime,
ActualElapsedTime) = (201, 230)";
+ String rowQuery = "SELECT COUNT(*) FROM " + getTableName()
+ + " WHERE (AirTime, ActualElapsedTime) = (201, 230)";
JsonNode rowResult = postQuery(rowQuery);
assertNoError(rowResult);
long rowCount =
rowResult.get("resultTable").get("rows").get(0).get(0).asLong();
- String expandedQuery = "SELECT COUNT(*) FROM mytable "
+ String expandedQuery = "SELECT COUNT(*) FROM " + getTableName() + " "
+ "WHERE (AirTime = 201) AND (ActualElapsedTime = 230)";
JsonNode expandedResult = postQuery(expandedQuery);
assertNoError(expandedResult);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/StarTreeTest.java
similarity index 67%
rename from
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
rename to
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/StarTreeTest.java
index 041b05be9a2..ef9f724016c 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/StarTreeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/StarTreeTest.java
@@ -16,16 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.integration.tests;
+package org.apache.pinot.integration.tests.custom;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
-import org.apache.commons.io.FileUtils;
import org.apache.pinot.integration.tests.startree.SegmentInfoProvider;
import org.apache.pinot.integration.tests.startree.StarTreeQueryGenerator;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -33,10 +34,10 @@ import
org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.util.TestUtils;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -48,22 +49,18 @@ import static org.testng.Assert.assertTrue;
* Integration test for Star-Tree based indexes.
* <ul>
* <li>
- * Set up the Pinot cluster and create two tables, one with default
indexes, one with star tree indexes
+ * Set up the Pinot cluster and create a table with star tree indexes
* </li>
* <li>
- * Send queries to both the tables and assert that results match
- * </li>
- * <li>
- * Query to reference table is sent with TOP 10000, and the comparator
ensures that response from star tree is
- * contained within the reference response. This is to avoid false
failures when groups with same value are
- * truncated due to TOP N
+ * Send queries with and without star-tree and assert that results match
* </li>
* </ul>
*/
-public class StarTreeClusterIntegrationTest extends BaseClusterIntegrationTest
{
+@Test(suiteName = "CustomClusterIntegrationTest")
+public class StarTreeTest extends CustomDataQueryClusterIntegrationTest {
public static final String FILTER_STARTREE_INDEX = "FILTER_STARTREE_INDEX";
private static final String SCHEMA_FILE_NAME =
-
"On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema";
+ "On_Time_On_Time_Performance_2014_100k_subset_nonulls_columns.schema";
private static final int NUM_STAR_TREE_DIMENSIONS = 5;
private static final int NUM_STAR_TREE_METRICS = 6;
private static final List<AggregationFunctionType>
AGGREGATION_FUNCTION_TYPES =
@@ -78,9 +75,38 @@ public class StarTreeClusterIntegrationTest extends
BaseClusterIntegrationTest {
private StarTreeQueryGenerator _starTree1QueryGenerator;
private StarTreeQueryGenerator _starTree2QueryGenerator;
+ // Fixed dimensions and metrics for the first star-tree
+ private final List<String> _starTree1Dimensions =
+ Arrays.asList("OriginCityName", "DepTimeBlk", "LongestAddGTime",
"CRSDepTime", "DivArrDelay");
+ private final List<String> _starTree1Metrics =
+ Arrays.asList("CarrierDelay", "DepDelay", "LateAircraftDelay",
"ArrivalDelayGroups", "ArrDel15", "AirlineID");
+
+ // Randomly picked dimensions and metrics for the second star-tree
(initialized in setUp)
+ private List<String> _starTree2Dimensions;
+ private List<String> _starTree2Metrics;
+
+ @Override
+ public String getTableName() {
+ return "StarTreeTest";
+ }
+
@Override
- protected String getSchemaFileName() {
- return SCHEMA_FILE_NAME;
+ public Schema createSchema() {
+ try {
+ InputStream inputStream =
getClass().getClassLoader().getResourceAsStream(SCHEMA_FILE_NAME);
+ Assert.assertNotNull(inputStream);
+ Schema schema = Schema.fromInputStream(inputStream);
+ schema.setSchemaName(getTableName());
+ return schema;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public List<File> createAvroFiles()
+ throws Exception {
+ return unpackAvroData(_tempDir);
}
// NOTE: Star-Tree and SegmentInfoProvider does not work on no-dictionary
dimensions
@@ -89,71 +115,67 @@ public class StarTreeClusterIntegrationTest extends
BaseClusterIntegrationTest {
return Arrays.asList("ActualElapsedTime", "ArrDelay", "DepDelay");
}
- @BeforeClass
- public void setUp()
- throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
- // Start the Pinot cluster
- startZk();
- startController();
- startBroker();
- startServers(2);
+ @Override
+ public String getTimeColumnName() {
+ return "DaysSinceEpoch";
+ }
- // Create and upload the schema and table config
+ @Override
+ public TableConfig createOfflineTableConfig() {
Schema schema = createSchema();
- addSchema(schema);
-
- // Pick fixed dimensions and metrics for the first star-tree
- List<String> starTree1Dimensions =
- Arrays.asList("OriginCityName", "DepTimeBlk", "LongestAddGTime",
"CRSDepTime", "DivArrDelay");
- List<String> starTree1Metrics =
- Arrays.asList("CarrierDelay", "DepDelay", "LateAircraftDelay",
"ArrivalDelayGroups", "ArrDel15", "AirlineID");
- int starTree1MaxLeafRecords = 10;
// Randomly pick some dimensions and metrics for the second star-tree
// Exclude TotalAddGTime since it's a multi-value column, should not be in
dimension split order.
List<String> allDimensions = new ArrayList<>(schema.getDimensionNames());
allDimensions.remove("TotalAddGTime");
Collections.shuffle(allDimensions, _random);
- List<String> starTree2Dimensions = allDimensions.subList(0,
NUM_STAR_TREE_DIMENSIONS);
+ _starTree2Dimensions = allDimensions.subList(0, NUM_STAR_TREE_DIMENSIONS);
List<String> allMetrics = new ArrayList<>(schema.getMetricNames());
Collections.shuffle(allMetrics, _random);
- List<String> starTree2Metrics = allMetrics.subList(0,
NUM_STAR_TREE_METRICS);
+ _starTree2Metrics = allMetrics.subList(0, NUM_STAR_TREE_METRICS);
+
+ int starTree1MaxLeafRecords = 10;
int starTree2MaxLeafRecords = 100;
+ int starTree3MaxLeafRecords = 10;
// Tests StarTree aggregate for multi-value column
List<String> starTree3Dimensions =
Arrays.asList("OriginCityName", "DepTimeBlk", "LongestAddGTime",
"CRSDepTime", "DivArrDelay");
- int starTree3MaxLeafRecords = 10;
- TableConfig tableConfig = createOfflineTableConfig();
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName(getTableName())
+ .setTimeColumnName(getTimeColumnName())
+ .setNoDictionaryColumns(getNoDictionaryColumns())
+ .setNumReplicas(getNumReplicas())
+ .setBrokerTenant(getBrokerTenant())
+ .setServerTenant(getServerTenant())
+ .build();
tableConfig.getIndexingConfig().setStarTreeIndexConfigs(
- Arrays.asList(getStarTreeIndexConfig(starTree1Dimensions,
starTree1Metrics, starTree1MaxLeafRecords),
- getStarTreeIndexConfig(starTree2Dimensions, starTree2Metrics,
starTree2MaxLeafRecords),
+ Arrays.asList(
+ getStarTreeIndexConfig(_starTree1Dimensions, _starTree1Metrics,
starTree1MaxLeafRecords),
+ getStarTreeIndexConfig(_starTree2Dimensions, _starTree2Metrics,
starTree2MaxLeafRecords),
getStarTreeIndexConfigForMVColAgg(starTree3Dimensions,
starTree3MaxLeafRecords)));
- addTableConfig(tableConfig);
- // Unpack the Avro files
- List<File> avroFiles = unpackAvroData(_tempDir);
-
- // Create and upload segments
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig,
schema, 0, _segmentDir, _tarDir);
- uploadSegments(DEFAULT_TABLE_NAME, _tarDir);
+ return tableConfig;
+ }
- // Set up the query generators
- SegmentInfoProvider segmentInfoProvider = new
SegmentInfoProvider(_tarDir.getPath());
- List<String> aggregationFunctions = new
ArrayList<>(AGGREGATION_FUNCTION_TYPES.size());
- for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) {
- aggregationFunctions.add(functionType.getName());
+ /**
+ * Initialize query generators after segments have been built and uploaded.
+ * This is called lazily before the first test that needs them.
+ */
+ private void ensureQueryGeneratorsInitialized()
+ throws Exception {
+ if (_starTree1QueryGenerator == null || _starTree2QueryGenerator == null) {
+ List<String> aggregationFunctions = new
ArrayList<>(AGGREGATION_FUNCTION_TYPES.size());
+ for (AggregationFunctionType functionType : AGGREGATION_FUNCTION_TYPES) {
+ aggregationFunctions.add(functionType.getName());
+ }
+ SegmentInfoProvider segmentInfoProvider = new
SegmentInfoProvider(_tarDir.getPath());
+ _starTree1QueryGenerator = new StarTreeQueryGenerator(getTableName(),
_starTree1Dimensions, _starTree1Metrics,
+ segmentInfoProvider.getSingleValueDimensionValuesMap(),
aggregationFunctions, _random);
+ _starTree2QueryGenerator = new StarTreeQueryGenerator(getTableName(),
_starTree2Dimensions, _starTree2Metrics,
+ segmentInfoProvider.getSingleValueDimensionValuesMap(),
aggregationFunctions, _random);
}
- _starTree1QueryGenerator = new StarTreeQueryGenerator(DEFAULT_TABLE_NAME,
starTree1Dimensions, starTree1Metrics,
- segmentInfoProvider.getSingleValueDimensionValuesMap(),
aggregationFunctions, _random);
- _starTree2QueryGenerator = new StarTreeQueryGenerator(DEFAULT_TABLE_NAME,
starTree2Dimensions, starTree2Metrics,
- segmentInfoProvider.getSingleValueDimensionValuesMap(),
aggregationFunctions, _random);
-
- // Wait for all documents loaded
- waitForAllDocsLoaded(600_000L);
}
private static StarTreeIndexConfig getStarTreeIndexConfig(List<String>
dimensions, List<String> metrics,
@@ -186,6 +208,7 @@ public class StarTreeClusterIntegrationTest extends
BaseClusterIntegrationTest {
public void testGeneratedQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ ensureQueryGeneratorsInitialized();
for (int i = 0; i < NUM_QUERIES_TO_GENERATE; i += 2) {
testStarQuery(_starTree1QueryGenerator.nextQuery(), false);
testStarQuery(_starTree2QueryGenerator.nextQuery(), false);
@@ -196,19 +219,22 @@ public class StarTreeClusterIntegrationTest extends
BaseClusterIntegrationTest {
public void testHardCodedQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String tableName = getTableName();
+
// This query can test the case of one predicate matches all the child
nodes but star-node cannot be used because
// the predicate is included as remaining predicate from another branch
- String starQuery = "SELECT DepTimeBlk, COUNT(*) FROM mytable "
- + "WHERE CRSDepTime BETWEEN 1137 AND 1849 AND DivArrDelay > 218 AND
CRSDepTime NOT IN (35, 1633, 1457, 140) "
+ String starQuery = "SELECT DepTimeBlk, COUNT(*) FROM " + tableName
+ + " WHERE CRSDepTime BETWEEN 1137 AND 1849 AND DivArrDelay > 218 AND
CRSDepTime NOT IN (35, 1633, 1457, 140) "
+ "AND LongestAddGTime NOT IN (17, 105, 20, 22) GROUP BY DepTimeBlk
ORDER BY DepTimeBlk";
testStarQuery(starQuery, !useMultiStageQueryEngine);
// Test MIN, MAX, SUM rewrite on LONG col
- starQuery =
- "SELECT MIN(AirlineID), MAX(AirlineID), SUM(AirlineID) FROM mytable
WHERE CRSDepTime BETWEEN 1137 AND 1849";
+ starQuery = "SELECT MIN(AirlineID), MAX(AirlineID), SUM(AirlineID) FROM "
+ tableName
+ + " WHERE CRSDepTime BETWEEN 1137 AND 1849";
testStarQuery(starQuery, !useMultiStageQueryEngine);
- starQuery = "SET enableNullHandling=true; SELECT COUNT(DivArrDelay) FROM
mytable WHERE DivArrDelay > 218";
+ starQuery = "SET enableNullHandling=true; SELECT COUNT(DivArrDelay) FROM "
+ tableName
+ + " WHERE DivArrDelay > 218";
testStarQuery(starQuery, false);
}
@@ -216,20 +242,19 @@ public class StarTreeClusterIntegrationTest extends
BaseClusterIntegrationTest {
public void testHardCodedFilteredAggQueries(boolean useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
- String starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE
CRSDepTime = 35) FROM mytable "
- + "WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
- // Don't verify that the query plan uses StarTree index, as this query
results in FILTER_EMPTY in the query plan.
- // This is still a valuable test, as it caught a bug where only the
subFilterContext was being preserved through
- // AggregationFunctionUtils#buildFilteredAggregationInfos
+ String tableName = getTableName();
+
+ String starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE
CRSDepTime = 35) FROM " + tableName
+ + " WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
testStarQuery(starQuery, false);
// Ensure the filtered agg and unfiltered agg can co-exist in one query
- starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE
DivArrDelay > 20) FROM mytable "
- + "WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
+ starQuery = "SELECT DepTimeBlk, COUNT(*), COUNT(*) FILTER (WHERE
DivArrDelay > 20) FROM " + tableName
+ + " WHERE CRSDepTime != 35 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
testStarQuery(starQuery, !useMultiStageQueryEngine);
- starQuery = "SELECT DepTimeBlk, COUNT(*) FILTER (WHERE CRSDepTime != 35)
FROM mytable "
- + "GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
+ starQuery = "SELECT DepTimeBlk, COUNT(*) FILTER (WHERE CRSDepTime != 35)
FROM " + tableName
+ + " GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
testStarQuery(starQuery, !useMultiStageQueryEngine);
}
@@ -237,24 +262,25 @@ public class StarTreeClusterIntegrationTest extends
BaseClusterIntegrationTest {
public void testMultiValueColumnAggregations(boolean
useMultiStageQueryEngine)
throws Exception {
setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ String tableName = getTableName();
- String starQuery = "SELECT COUNTMV(TotalAddGTime), SUMMV(TotalAddGTime),
AVGMV(TotalAddGTime) FROM mytable";
+ String starQuery = "SELECT COUNTMV(TotalAddGTime), SUMMV(TotalAddGTime),
AVGMV(TotalAddGTime) FROM " + tableName;
testStarQuery(starQuery, !useMultiStageQueryEngine);
- starQuery = "SELECT OriginCityName, COUNTMV(TotalAddGTime),
AVGMV(TotalAddGTime), SUMMV(TotalAddGTime) "
- + "FROM mytable GROUP BY OriginCityName ORDER BY OriginCityName";
+ starQuery = "SELECT OriginCityName, COUNTMV(TotalAddGTime),
AVGMV(TotalAddGTime), SUMMV(TotalAddGTime) FROM "
+ + tableName + " GROUP BY OriginCityName ORDER BY OriginCityName";
testStarQuery(starQuery, !useMultiStageQueryEngine);
- starQuery = "SELECT DepTimeBlk, SUMMV(TotalAddGTime), AVGMV(TotalAddGTime)
FROM mytable "
- + "WHERE CRSDepTime > 1000 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
+ starQuery = "SELECT DepTimeBlk, SUMMV(TotalAddGTime), AVGMV(TotalAddGTime)
FROM " + tableName
+ + " WHERE CRSDepTime > 1000 GROUP BY DepTimeBlk ORDER BY DepTimeBlk";
testStarQuery(starQuery, !useMultiStageQueryEngine);
- starQuery = "SELECT OriginCityName, DepTimeBlk, SUMMV(TotalAddGTime) FROM
mytable "
- + "GROUP BY OriginCityName, DepTimeBlk ORDER BY OriginCityName,
DepTimeBlk LIMIT 100";
+ starQuery = "SELECT OriginCityName, DepTimeBlk, SUMMV(TotalAddGTime) FROM
" + tableName
+ + " GROUP BY OriginCityName, DepTimeBlk ORDER BY OriginCityName,
DepTimeBlk LIMIT 100";
testStarQuery(starQuery, !useMultiStageQueryEngine);
- starQuery = "SELECT CRSDepTime, AVGMV(TotalAddGTime) FROM mytable "
- + "WHERE CRSDepTime BETWEEN 800 AND 1200 AND DivArrDelay < 100 "
+ starQuery = "SELECT CRSDepTime, AVGMV(TotalAddGTime) FROM " + tableName
+ + " WHERE CRSDepTime BETWEEN 800 AND 1200 AND DivArrDelay < 100 "
+ "GROUP BY CRSDepTime ORDER BY CRSDepTime";
testStarQuery(starQuery, !useMultiStageQueryEngine);
}
@@ -297,17 +323,4 @@ public class StarTreeClusterIntegrationTest extends
BaseClusterIntegrationTest {
+ "Star Query: %s\nStar Response: %s\nReference Query:
%s\nReference Response: %s\nRandom Seed: %d",
starQuery, starResponse, referenceQuery, referenceResponse,
_randomSeed));
}
-
- @AfterClass
- public void tearDown()
- throws Exception {
- dropOfflineTable(DEFAULT_TABLE_NAME);
-
- stopServer();
- stopBroker();
- stopController();
- stopZk();
-
- FileUtils.deleteDirectory(_tempDir);
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]