This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5172d76a67 Migrate current integration batch tests to equivalent MSQ
tests (#13374)
5172d76a67 is described below
commit 5172d76a6720a217c321e90b9294435ca60242ea
Author: abhagraw <[email protected]>
AuthorDate: Mon Nov 21 09:12:02 2022 +0530
Migrate current integration batch tests to equivalent MSQ tests (#13374)
* Migrate current integration batch tests to equivalent MSQ tests using new
IT framework
* Fix build issues
* Trigger Build
* Adding more tests and addressing comments
* fixBuildIssues
* fix dependency issues
* Parameterized the test and addressed comments
* Addressing comments
* fixing checkstyle errors
* Adressing comments
---
.travis.yml | 16 +--
distribution/pom.xml | 2 +
integration-tests-ex/cases/pom.xml | 19 ++-
.../testsEx/msq/AbstractITSQLBasedIngestion.java | 121 +++++++++++++++++
.../testsEx/msq/ITSQLBasedBatchIngestion.java | 71 ++++++++++
.../batch-index/json_path_index_queries.json | 49 +++++++
.../multi-stage-query/batch-index/msq_inline.sql | 17 +++
.../batch-index/sparse_column_msq.json | 93 +++++++++++++
.../batch-index/sparse_column_msq.sql | 21 +++
.../batch-index/wikipedia_http_inputsource_msq.sql | 29 ++++
.../wikipedia_http_inputsource_queries.json | 47 +++++++
.../batch-index/wikipedia_index_msq.sql | 32 +++++
.../batch-index/wikipedia_index_queries.json | 150 +++++++++++++++++++++
.../wikipedia_index_queries_with_transform.json | 62 +++++++++
.../wikipedia_index_task_with_transform.sql | 32 +++++
.../batch-index/wikipedia_merge_index_task.sql | 33 +++++
.../druid/testing/utils/MsqTestQueryHelper.java | 31 ++++-
17 files changed, 808 insertions(+), 17 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 2fd32ffd7b..2d785486ac 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -442,10 +442,6 @@ jobs:
docker exec -it druid-$v sh -c 'dmesg | tail -3' ;
done
- - <<: *integration_batch_index
- name: "(Compile=openjdk8, Run=openjdk8) batch index integration test
with Indexer"
- env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=8'
USE_INDEXER='indexer'
-
- &integration_input_format
name: "(Compile=openjdk8, Run=openjdk8) input format integration test"
stage: Tests - phase 2
@@ -689,11 +685,13 @@ jobs:
env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
script: ./it.sh travis Catalog
- # Disabling BatchIndex test as it is failing with due to timeout, fixing it
will be taken in a separate PR.
- #- <<: *integration_tests_ex
- # name: "(Compile=openjdk8, Run=openjdk8) batch index integration test
with Indexer (new)"
- # env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
- # script: ./it.sh travis BatchIndex
+ - &integration_tests_ex
+ name: "(Compile=openjdk8, Run=openjdk8) batch index integration test
with Indexer (new)"
+ stage: Tests - phase 2
+ jdk: openjdk8
+ services: *integration_test_services
+ env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
+ script: ./it.sh travis BatchIndex
# END - Integration tests for Compile with Java 8 and Run with Java 8
diff --git a/distribution/pom.xml b/distribution/pom.xml
index b73e4215a6..e2b7773b09 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -225,6 +225,8 @@
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-multi-stage-query</argument>
<argument>-c</argument>
+
<argument>org.apache.druid.extensions:druid-catalog</argument>
+ <argument>-c</argument>
<argument>org.apache.druid.extensions:druid-protobuf-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:mysql-metadata-storage</argument>
diff --git a/integration-tests-ex/cases/pom.xml
b/integration-tests-ex/cases/pom.xml
index 5456d4b81b..cf781f6f88 100644
--- a/integration-tests-ex/cases/pom.xml
+++ b/integration-tests-ex/cases/pom.xml
@@ -155,7 +155,7 @@
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>mysql-metadata-storage</artifactId>
<version>${project.parent.version}</version>
@@ -218,10 +218,15 @@
<artifactId>JUnitParams</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>javax.ws.rs</groupId>
- <artifactId>jsr311-api</artifactId>
- </dependency>
+ <dependency>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>jsr311-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ <version>5.4.0</version>
+ </dependency>
</dependencies>
<!-- Exclude ITs from surefire. Required because they end with "Test". -->
@@ -345,8 +350,8 @@
<goal>integration-test</goal>
</goals>
<configuration>
- <!-- our tests are very verbose, let's keep
the volume down -->
-
<redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <!-- Turning on logs so that travis does not
time out tests for not providing any output. -->
+
<redirectTestOutputToFile>False</redirectTestOutputToFile>
<!-- Can run only one test category per Maven
run. -->
<groups>org.apache.druid.testsEx.categories.${it.category}</groups>
</configuration>
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java
new file mode 100644
index 0000000000..4bb1cdc478
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.google.inject.Inject;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.utils.DataLoaderHelper;
+import org.apache.druid.testing.utils.MsqTestQueryHelper;
+import org.apache.druid.testing.utils.TestQueryHelper;
+import org.apache.druid.testsEx.indexer.AbstractITBatchIndexTest;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class AbstractITSQLBasedIngestion
+{
+ public static final Logger LOG = new Logger(TestQueryHelper.class);
+ @Inject
+ private MsqTestQueryHelper msqHelper;
+
+ @Inject
+ protected TestQueryHelper queryHelper;
+
+ @Inject
+ private DataLoaderHelper dataLoaderHelper;
+
+ /**
+ * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide
datasource value.
+ */
+ protected String getStringFromFileAndReplaceDatasource(String filePath,
String datasource)
+ {
+ String fileString;
+ try {
+ InputStream is =
AbstractITBatchIndexTest.class.getResourceAsStream(filePath);
+ fileString = IOUtils.toString(is, StandardCharsets.UTF_8);
+ }
+ catch (IOException e) {
+ throw new ISE(e, "could not read query file: %s", filePath);
+ }
+
+ fileString = StringUtils.replace(
+ fileString,
+ "%%DATASOURCE%%",
+ datasource
+ );
+
+ return fileString;
+ }
+
+ /**
+ * Reads native queries from a file and runs against the provided datasource.
+ */
+ protected void doTestQuery(String queryFilePath, String dataSource)
+ {
+ try {
+ String query = getStringFromFileAndReplaceDatasource(queryFilePath,
dataSource);
+ queryHelper.testQueriesFromString(query);
+ }
+ catch (Exception e) {
+ LOG.error(e, "Error while running test query");
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Sumits a sqlTask, waits for task completion and then runs test queries on
ingested datasource.
+ */
+ protected void submitTaskAnddoTestQuery(String sqlTask, String
queryFilePath, String datasource,
+ Map<String, Object> msqContext)
throws Exception
+ {
+ LOG.info("SqlTask - \n %s", sqlTask);
+
+ // Submit the tasks and wait for the datasource to get loaded
+ msqHelper.submitMsqTaskAndWaitForCompletion(
+ sqlTask,
+ msqContext
+ );
+
+ dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+ doTestQuery(queryFilePath, datasource);
+ }
+
+ /**
+ * Runs a MSQ ingest sql test.
+ *
+ * @param sqlFilePath path of file containing the sql query.
+ * @param queryFilePath path of file containing the native test queries to
be run on the ingested datasource.
+ * @param datasource name of the datasource. %%DATASOURCE%% in the sql and
queries will be replaced with this value.
+ * @param msqContext context parameters to be passed with MSQ API call.
+ */
+ protected void runMSQTaskandTestQueries(String sqlFilePath, String
queryFilePath, String datasource,
+ Map<String, Object> msqContext)
throws Exception
+ {
+ LOG.info("Starting MSQ test for [%s, %s]", sqlFilePath, queryFilePath);
+
+ String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath,
datasource);
+ submitTaskAnddoTestQuery(sqlTask, queryFilePath, datasource, msqContext);
+ }
+}
diff --git
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java
new file mode 100644
index 0000000000..dbc26d7d40
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import junitparams.Parameters;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.druid.testsEx.categories.MultiStageQuery;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(DruidTestRunner.class)
+@Category(MultiStageQuery.class)
+public class ITSQLBasedBatchIngestion extends AbstractITSQLBasedIngestion
+{
+ private static final String BATCH_INDEX_TASKS_DIR =
"/multi-stage-query/batch-index/";
+
+ public static List<List<String>> test_cases()
+ {
+ return Arrays.asList(
+ Arrays.asList("msq_inline.sql", "json_path_index_queries.json"),
+ Arrays.asList("sparse_column_msq.sql", "sparse_column_msq.json"),
+ Arrays.asList("wikipedia_http_inputsource_msq.sql",
"wikipedia_http_inputsource_queries.json"),
+ Arrays.asList("wikipedia_index_msq.sql",
"wikipedia_index_queries.json"),
+ Arrays.asList("wikipedia_merge_index_task.sql",
"wikipedia_index_queries.json"),
+ Arrays.asList("wikipedia_index_task_with_transform.sql",
"wikipedia_index_queries_with_transform.json")
+ );
+
+ }
+
+ @Test
+ @Parameters(method = "test_cases")
+ public void testSQLBasedBatchIngestion(String sqlFileName, String
queryFileName)
+ {
+ try {
+ runMSQTaskandTestQueries(BATCH_INDEX_TASKS_DIR + sqlFileName,
+ BATCH_INDEX_TASKS_DIR + queryFileName,
+ FilenameUtils.removeExtension(sqlFileName),
+ ImmutableMap.of("finalizeAggregations", false,
+ "maxNumTasks", 5,
+
"groupByEnableMultiValueUnnesting", false
+ ));
+ }
+ catch (Exception e) {
+ LOG.error(e, "Error while testing [%s, %s]", sqlFileName, queryFileName);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json
new file mode 100644
index 0000000000..845af00dd8
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json
@@ -0,0 +1,49 @@
+[
+ {
+ "description": "timeseries",
+ "query": {
+ "queryType": "timeseries",
+ "dataSource": "%%DATASOURCE%%",
+ "intervals": [
+ "1000/3000"
+ ],
+ "aggregations": [
+ {
+ "type": "longSum",
+ "name": "len",
+ "fieldName": "len"
+ },
+ {
+ "type": "longSum",
+ "name": "max",
+ "fieldName": "max"
+ },
+ {
+ "type": "longSum",
+ "name": "min",
+ "fieldName": "min"
+ },
+ {
+ "type": "longSum",
+ "name": "sum",
+ "fieldName": "sum"
+ }
+ ],
+ "granularity": {
+ "type": "all"
+ }
+ },
+ "expectedResults": [
+ {
+ "timestamp": "2013-08-31T01:02:33.000Z",
+ "result": {
+ "sum": 10,
+ "min": 0,
+ "len": 5,
+ "max": 4
+ }
+ }
+ ]
+ }
+]
+
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql
new file mode 100644
index 0000000000..a710691574
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql
@@ -0,0 +1,17 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+ '{"type":"inline","data":"{\"timestamp\": \"2013-08-31T01:02:33Z\",
\"values\": [0,1,2,3,4] }"}',
+
'{"type":"json","flattenSpec":{"useFieldDiscovery":true,"fields":[{"type":"path","name":"len","expr":"$.values.length()"},{"type":"path","name":"min","expr":"$.values.min()"},{"type":"path","name":"max","expr":"$.values.max()"},{"type":"path","name":"sum","expr":"$.values.sum()"}]}}',
+
'[{"name":"timestamp","type":"string"},{"name":"len","type":"long"},{"name":"min","type":"long"},{"name":"max","type":"long"},{"name":"sum","type":"long"}]'
+ )
+))
+SELECT
+ TIME_PARSE("timestamp") AS __time,
+ "len",
+ "min",
+ "max",
+ "sum"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5
+PARTITIONED BY HOUR
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json
new file mode 100644
index 0000000000..4c2c5aa295
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json
@@ -0,0 +1,93 @@
+[
+ {
+ "description": "timeseries, 1 agg, all",
+ "query":{
+ "queryType" : "timeBoundary",
+ "dataSource": "%%DATASOURCE%%"
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2015-09-12T00:00:00.000Z",
+ "result" : {
+ "minTime" : "2015-09-12T00:00:00.000Z",
+ "maxTime" : "2015-09-12T00:00:00.000Z"
+ }
+ }
+ ]
+ },
+ {
+ "description": "scan, all",
+ "query": {
+ "queryType": "scan",
+ "dataSource": "%%DATASOURCE%%",
+ "intervals": [
+ "2013-01-01/2020-01-02"
+ ],
+ "resultFormat":"compactedList"
+ },
+ "expectedResults": [{
+
"segmentId":"dstsparse_column_msq.json_2015-09-12T00:00:00.000Z_2015-09-12T01:00:00.000Z_2022-11-17T12:32:11.247Z",
+
"columns":["__time","dimB","dimA","dimC","dimD","dimE","dimF","count","sum_metA"],
+ "events":[
+ [1442016000000,"F","C",null,null,null,null,1,1],
+ [1442016000000,"J","C",null,null,null,null,1,1],
+ [1442016000000,"R","J",null,null,null,null,1,1],
+ [1442016000000,"S","Z",null,null,null,null,1,1],
+ [1442016000000,"T","H",null,null,null,null,1,1],
+ [1442016000000,"X",null,"A",null,null,null,1,1],
+ [1442016000000,"X","H",null,null,null,null,3,3],
+ [1442016000000,"Z","H",null,null,null,null,1,1]
+ ]
+ }],
+ "fieldsToTest": ["events"]
+ },
+ {
+ "description": "roll up ratio",
+ "query": {
+ "queryType":"timeseries",
+ "dataSource":{
+ "type":"table",
+ "name":"%%DATASOURCE%%"
+ },
+ "intervals":{
+ "type":"intervals",
+ "intervals":[
+ "2013-01-01/2020-01-02"
+ ]
+ },
+ "granularity":{
+ "type":"all"
+ },
+ "aggregations":[
+ {
+ "type":"count",
+ "name":"a0"
+ },
+ {
+ "type":"longSum",
+ "name":"a1",
+ "fieldName":"count",
+ "expression":null
+ }
+ ],
+ "postAggregations":[
+ {
+ "type":"expression",
+ "name":"p0",
+ "expression":"((\"a0\" * 1.00) / \"a1\")",
+ "ordering":null
+ }
+ ]
+ },
+ "expectedResults": [
+ {
+ "timestamp" : "2015-09-12T00:00:00.000Z",
+ "result" : {
+ "a1" : 10,
+ "p0" : 0.8,
+ "a0" : 8
+ }
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql
new file mode 100644
index 0000000000..f844f59964
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql
@@ -0,0 +1,21 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+
'{"type":"inline","data":"{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"F\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"J\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"Z\",\"dimB\":\"S\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\
[...]
+ '{"type":"json"}',
+
'[{"name":"time","type":"string"},{"name":"dimB","type":"string"},{"name":"dimA","type":"string"},{"name":"dimC","type":"string"},{"name":"dimD","type":"string"},{"name":"dimE","type":"string"},{"name":"dimF","type":"string"},{"name":"metA","type":"long"}]'
+ )
+))
+SELECT
+ TIME_FLOOR(TIME_PARSE("time"), 'PT1H') AS __time,
+ "dimB",
+ "dimA",
+ "dimC",
+ "dimD",
+ "dimE",
+ "dimF",
+ COUNT(*) AS "count",
+ SUM("metA") AS "sum_metA"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7
+PARTITIONED BY HOUR
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql
new file mode 100644
index 0000000000..f1af33bed4
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql
@@ -0,0 +1,29 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+
'{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz","https://druid.apache.org/data/wikipedia.json.gz"]}',
+ '{"type":"json"}',
+
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double
[...]
+ )
+))
+SELECT
+ TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN
MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp")
END, 'PT1S') AS __time,
+ "page",
+ "language",
+ "user",
+ "unpatrolled",
+ "newPage",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city",
+ COUNT(*) AS "count",
+ SUM("added") AS "added",
+ SUM("deleted") AS "deleted",
+ SUM("delta") AS "delta"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13
+PARTITIONED BY DAY
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json
new file mode 100644
index 0000000000..2d454d59d8
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json
@@ -0,0 +1,47 @@
+[
+ {
+ "description": "timeseries, 1 agg, all",
+ "query":{
+ "queryType" : "timeBoundary",
+ "dataSource": "%%DATASOURCE%%"
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2016-06-27T00:00:11.000Z",
+ "result" : {
+ "minTime" : "2016-06-27T00:00:11.000Z",
+ "maxTime" : "2016-06-27T21:31:02.000Z"
+ }
+ }
+ ]
+ },
+ {
+ "description": "simple aggr",
+ "query":{
+ "queryType" : "topN",
+ "dataSource" : "%%DATASOURCE%%",
+ "intervals" : ["2016-06-27/2016-06-28"],
+ "granularity" : "all",
+ "dimension" : "page",
+ "metric" : "count",
+ "threshold" : 3,
+ "aggregations" : [
+ {
+ "type" : "count",
+ "name" : "count"
+ }
+ ]
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2016-06-27T00:00:11.000Z",
+ "result" :
+ [
+ {"count":29,"page":"Copa América Centenario"},
+ {"count":16,"page":"User:Cyde/List of candidates for
speedy deletion/Subpage"},
+ {"count":16,"page":"Wikipedia:Administrators'
noticeboard/Incidents"}
+ ]
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql
new file mode 100644
index 0000000000..738e39fb87
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql
@@ -0,0 +1,32 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" as (SELECT * FROM TABLE(
+ EXTERN(
+
'{"type":"local","files":["/resources/data/batch_index/json/wikipedia_index_data1.json","/resources/data/batch_index/json/wikipedia_index_data2.json","/resources/data/batch_index/json/wikipedia_index_data3.json"]}',
+ '{"type":"json"}',
+
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double
[...]
+ )
+))
+SELECT
+ TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN
MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp")
END, 'PT1S') AS __time,
+ "page",
+ "language",
+ "user",
+ "unpatrolled",
+ "newPage",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city",
+ COUNT(*) AS "count",
+ SUM("added") AS "added",
+ SUM("deleted") AS "deleted",
+ SUM("delta") AS "delta",
+ APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
+ DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
+ APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13
+PARTITIONED BY DAY
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json
new file mode 100644
index 0000000000..928effe65e
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json
@@ -0,0 +1,150 @@
+[
+ {
+ "description": "timeseries, 1 agg, all",
+ "query":{
+ "queryType" : "timeBoundary",
+ "dataSource": "%%DATASOURCE%%"
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-31T01:02:33.000Z",
+ "result" : {
+ "minTime" : "2013-08-31T01:02:33.000Z",
+ "maxTime" : "2013-09-01T12:41:27.000Z"
+ }
+ }
+ ]
+ },
+ {
+ "description": "timeseries, datasketch aggs, all",
+ "query":{
+ "queryType" : "timeseries",
+ "dataSource": "%%DATASOURCE%%",
+ "granularity":"day",
+ "intervals":[
+ "2013-08-31T00:00/2013-09-01T00:00"
+ ],
+ "filter":null,
+ "aggregations":[
+ {
+ "type": "HLLSketchMerge",
+ "name": "approxCountHLL",
+ "fieldName": "HLLSketchBuild",
+ "lgK": 12,
+ "tgtHllType": "HLL_4",
+ "round": true
+ },
+ {
+ "type":"thetaSketch",
+ "name":"approxCountTheta",
+ "fieldName":"thetaSketch",
+ "size":16384,
+ "shouldFinalize":true,
+ "isInputThetaSketch":false,
+ "errorBoundsStdDev":null
+ },
+ {
+ "type":"quantilesDoublesSketch",
+ "name":"quantilesSketch",
+ "fieldName":"quantilesDoublesSketch",
+ "k":128
+ }
+ ]
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-31T00:00:00.000Z",
+ "result" : {
+ "quantilesSketch":5,
+ "approxCountTheta":5.0,
+ "approxCountHLL":5
+ }
+ }
+ ]
+ },
+ {
+ "description":"having spec on post aggregation",
+ "query":{
+ "queryType":"groupBy",
+ "dataSource":"%%DATASOURCE%%",
+ "granularity":"day",
+ "dimensions":[
+ "page"
+ ],
+ "filter":{
+ "type":"selector",
+ "dimension":"language",
+ "value":"zh"
+ },
+ "aggregations":[
+ {
+ "type":"count",
+ "name":"rows"
+ },
+ {
+ "type":"longSum",
+ "fieldName":"added",
+ "name":"added_count"
+ }
+ ],
+ "postAggregations": [
+ {
+ "type":"arithmetic",
+ "name":"added_count_times_ten",
+ "fn":"*",
+ "fields":[
+ {"type":"fieldAccess", "name":"added_count",
"fieldName":"added_count"},
+ {"type":"constant", "name":"const", "value":10}
+ ]
+ }
+ ],
+ "having":{"type":"greaterThan",
"aggregation":"added_count_times_ten", "value":9000},
+ "intervals":[
+ "2013-08-31T00:00/2013-09-01T00:00"
+ ]
+ },
+ "expectedResults":[ {
+ "version" : "v1",
+ "timestamp" : "2013-08-31T00:00:00.000Z",
+ "event" : {
+ "added_count_times_ten" : 9050.0,
+ "page" : "Crimson Typhoon",
+ "added_count" : 905,
+ "rows" : 1
+ }
+ } ]
+ },
+ {
+ "description": "timeseries, stringFirst/stringLast aggs, all",
+ "query":{
+ "queryType" : "timeseries",
+ "dataSource": "%%DATASOURCE%%",
+ "granularity":"day",
+ "intervals":[
+ "2013-08-31T00:00/2013-09-01T00:00"
+ ],
+ "filter":null,
+ "aggregations":[
+ {
+ "type": "stringFirst",
+ "name": "first_user",
+ "fieldName": "user"
+ },
+ {
+ "type":"stringLast",
+ "name":"last_user",
+ "fieldName":"user"
+ }
+ ]
+ },
+ "expectedResults":[
+ {
+ "timestamp" : "2013-08-31T00:00:00.000Z",
+ "result" : {
+ "first_user":"nuclear",
+ "last_user":"stringer"
+ }
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json
new file mode 100644
index 0000000000..f0cfba6773
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json
@@ -0,0 +1,62 @@
+[
+ {
+ "description":"having spec on post aggregation",
+ "query":{
+ "queryType":"groupBy",
+ "dataSource":"%%DATASOURCE%%",
+ "granularity":"day",
+ "dimensions":[
+ "page",
+ "city"
+ ],
+ "filter":{
+ "type":"selector",
+ "dimension":"language",
+ "value":"language-zh"
+ },
+ "aggregations":[
+ {
+ "type":"count",
+ "name":"rows"
+ },
+ {
+ "type":"longSum",
+ "fieldName":"triple-added",
+ "name":"added_count"
+ },
+ {
+ "type":"longSum",
+ "fieldName":"delta",
+ "name":"delta_sum"
+ }
+ ],
+ "postAggregations": [
+ {
+ "type":"arithmetic",
+ "name":"added_count_times_ten",
+ "fn":"*",
+ "fields":[
+ {"type":"fieldAccess", "name":"added_count",
"fieldName":"added_count"},
+ {"type":"constant", "name":"const", "value":10}
+ ]
+ }
+ ],
+ "having":{"type":"greaterThan",
"aggregation":"added_count_times_ten", "value":9000},
+ "intervals":[
+ "2013-08-31T00:00/2013-09-01T00:00"
+ ]
+ },
+ "expectedResults":[ {
+ "version" : "v1",
+ "timestamp" : "2013-08-31T00:00:00.000Z",
+ "event" : {
+ "added_count_times_ten" : 27150.0,
+ "page" : "Crimson Typhoon",
+ "city" : "Taiyuan",
+ "added_count" : 2715,
+ "delta_sum" : 900,
+ "rows" : 1
+ }
+ } ]
+ }
+]
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql
new file mode 100644
index 0000000000..ebdeeda689
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql
@@ -0,0 +1,32 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+
'{"type":"local","baseDir":"/resources/data/batch_index/json","filter":"wikipedia_index_data*"}',
+ '{"type":"json"}',
+
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"triple-added","type":"d
[...]
+ )
+))
+SELECT
+ TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN
MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp")
END, 'PT1S') AS __time,
+ "page",
+ concat('language-', "language") AS "language",
+ "user",
+ "unpatrolled",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city",
+ COUNT(*) AS "count",
+ SUM("added") AS "added",
+ SUM("added")*3 AS "triple-added",
+ SUM("deleted") AS "deleted",
+ SUM("delta") AS "delta",
+ APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
+ DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
+ APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12
+PARTITIONED BY DAY
\ No newline at end of file
diff --git
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql
new file mode 100644
index 0000000000..a8160aa905
--- /dev/null
+++
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql
@@ -0,0 +1,33 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+ EXTERN(
+
'{"type":"local","baseDir":"/resources/data/batch_index/json","filter":"wikipedia_index_data*"}',
+ '{"type":"json"}',
+
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double
[...]
+ )
+))
+SELECT
+ TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN
MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp")
END, 'PT1S') AS __time,
+ "page",
+ "language",
+ "user",
+ "unpatrolled",
+ "newPage",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city",
+ "timestamp",
+ COUNT(*) AS "count",
+ SUM("added") AS "added",
+ SUM("deleted") AS "deleted",
+ SUM("delta") AS "delta",
+ APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
+ DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
+ APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
+PARTITIONED BY DAY
\ No newline at end of file
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
index 424d070529..37647fde87 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
@@ -43,6 +43,7 @@ import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.SqlResourceTestClient;
import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.testng.Assert;
import java.util.ArrayList;
import java.util.Collections;
@@ -95,7 +96,15 @@ public class MsqTestQueryHelper extends
AbstractTestQueryHelper<MsqQueryWithResu
*/
public SqlTaskStatus submitMsqTask(String sqlQueryString) throws
ExecutionException, InterruptedException
{
- return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false,
false, ImmutableMap.of(), null));
+ return submitMsqTask(sqlQueryString, ImmutableMap.of());
+ }
+
+ /**
+ * Submits a task to the MSQ API with the given query string, and default
headers and custom context parameters
+ */
+ public SqlTaskStatus submitMsqTask(String sqlQueryString, Map<String,
Object> context) throws ExecutionException, InterruptedException
+ {
+ return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false,
false, context, null));
}
// Run the task, wait for it to complete, fetch the reports, verify the
results,
@@ -154,6 +163,7 @@ public class MsqTestQueryHelper extends
AbstractTestQueryHelper<MsqQueryWithResu
throw new TaskStillRunningException();
},
(Throwable t) -> t instanceof TaskStillRunningException,
+ 99,
100
);
}
@@ -250,6 +260,25 @@ public class MsqTestQueryHelper extends
AbstractTestQueryHelper<MsqQueryWithResu
}
}
+ /**
+ * Submits a {@link SqlQuery} to the MSQ API for execution. This method
waits for the created task to be completed.
+ */
+ public void submitMsqTaskAndWaitForCompletion(String sqlQueryString,
Map<String, Object> context)
+ throws Exception
+ {
+ SqlTaskStatus sqlTaskStatus = submitMsqTask(sqlQueryString, context);
+
+ LOG.info("Sql Task submitted with task Id - %s",
sqlTaskStatus.getTaskId());
+
+ if (sqlTaskStatus.getState().isFailure()) {
+ Assert.fail(StringUtils.format(
+ "Unable to start the task successfully.\nPossible exception: %s",
+ sqlTaskStatus.getError()
+ ));
+ }
+ pollTaskIdForCompletion(sqlTaskStatus.getTaskId());
+ }
+
private static class TaskStillRunningException extends Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]