This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5172d76a67 Migrate current integration batch tests to equivalent MSQ 
tests (#13374)
5172d76a67 is described below

commit 5172d76a6720a217c321e90b9294435ca60242ea
Author: abhagraw <[email protected]>
AuthorDate: Mon Nov 21 09:12:02 2022 +0530

    Migrate current integration batch tests to equivalent MSQ tests (#13374)
    
    * Migrate current integration batch tests to equivalent MSQ tests using new 
IT framework
    
    * Fix build issues
    
    * Trigger Build
    
    * Adding more tests and addressing comments
    
    * fixBuildIssues
    
    * fix dependency issues
    
    * Parameterized the test and addressed comments
    
    * Addressing comments
    
    * fixing checkstyle errors
    
    * Adressing comments
---
 .travis.yml                                        |  16 +--
 distribution/pom.xml                               |   2 +
 integration-tests-ex/cases/pom.xml                 |  19 ++-
 .../testsEx/msq/AbstractITSQLBasedIngestion.java   | 121 +++++++++++++++++
 .../testsEx/msq/ITSQLBasedBatchIngestion.java      |  71 ++++++++++
 .../batch-index/json_path_index_queries.json       |  49 +++++++
 .../multi-stage-query/batch-index/msq_inline.sql   |  17 +++
 .../batch-index/sparse_column_msq.json             |  93 +++++++++++++
 .../batch-index/sparse_column_msq.sql              |  21 +++
 .../batch-index/wikipedia_http_inputsource_msq.sql |  29 ++++
 .../wikipedia_http_inputsource_queries.json        |  47 +++++++
 .../batch-index/wikipedia_index_msq.sql            |  32 +++++
 .../batch-index/wikipedia_index_queries.json       | 150 +++++++++++++++++++++
 .../wikipedia_index_queries_with_transform.json    |  62 +++++++++
 .../wikipedia_index_task_with_transform.sql        |  32 +++++
 .../batch-index/wikipedia_merge_index_task.sql     |  33 +++++
 .../druid/testing/utils/MsqTestQueryHelper.java    |  31 ++++-
 17 files changed, 808 insertions(+), 17 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 2fd32ffd7b..2d785486ac 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -442,10 +442,6 @@ jobs:
           docker exec -it druid-$v sh -c 'dmesg | tail -3' ;
           done
 
-    - <<: *integration_batch_index
-      name: "(Compile=openjdk8, Run=openjdk8) batch index integration test 
with Indexer"
-      env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=8' 
USE_INDEXER='indexer'
-
     - &integration_input_format
       name: "(Compile=openjdk8, Run=openjdk8) input format integration test"
       stage: Tests - phase 2
@@ -689,11 +685,13 @@ jobs:
       env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager'
       script: ./it.sh travis Catalog
 
-   # Disabling BatchIndex test as it is failing with due to timeout, fixing it 
will be taken in a separate PR.
-    #- <<: *integration_tests_ex
-    #  name: "(Compile=openjdk8, Run=openjdk8) batch index integration test 
with Indexer (new)"
-    #  env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
-    #  script: ./it.sh travis BatchIndex
+    - &integration_tests_ex
+      name: "(Compile=openjdk8, Run=openjdk8) batch index integration test 
with Indexer (new)"
+      stage: Tests - phase 2
+      jdk: openjdk8
+      services: *integration_test_services
+      env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer'
+      script: ./it.sh travis BatchIndex
 
     # END - Integration tests for Compile with Java 8 and Run with Java 8
 
diff --git a/distribution/pom.xml b/distribution/pom.xml
index b73e4215a6..e2b7773b09 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -225,6 +225,8 @@
                                         <argument>-c</argument>
                                         
<argument>org.apache.druid.extensions:druid-multi-stage-query</argument>
                                         <argument>-c</argument>
+                                        
<argument>org.apache.druid.extensions:druid-catalog</argument>
+                                        <argument>-c</argument>
                                         
<argument>org.apache.druid.extensions:druid-protobuf-extensions</argument>
                                         <argument>-c</argument>
                                         
<argument>org.apache.druid.extensions:mysql-metadata-storage</argument>
diff --git a/integration-tests-ex/cases/pom.xml 
b/integration-tests-ex/cases/pom.xml
index 5456d4b81b..cf781f6f88 100644
--- a/integration-tests-ex/cases/pom.xml
+++ b/integration-tests-ex/cases/pom.xml
@@ -155,7 +155,7 @@
             <groupId>org.jdbi</groupId>
             <artifactId>jdbi</artifactId>
         </dependency>
-       <dependency>
+        <dependency>
             <groupId>org.apache.druid.extensions</groupId>
             <artifactId>mysql-metadata-storage</artifactId>
             <version>${project.parent.version}</version>
@@ -218,10 +218,15 @@
             <artifactId>JUnitParams</artifactId>
             <scope>test</scope>
         </dependency>
-               <dependency>
-                   <groupId>javax.ws.rs</groupId>
-                   <artifactId>jsr311-api</artifactId>
-               </dependency>
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>jsr311-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-client</artifactId>
+            <version>5.4.0</version>
+        </dependency>
     </dependencies>
 
     <!-- Exclude ITs from surefire. Required because they end with "Test". -->
@@ -345,8 +350,8 @@
                                 <goal>integration-test</goal>
                             </goals>
                             <configuration>
-                                <!-- our tests are very verbose, let's keep 
the volume down -->
-                                
<redirectTestOutputToFile>true</redirectTestOutputToFile>
+                                <!-- Turning on logs so that travis does not 
time out tests for not providing any output.  -->
+                                
<redirectTestOutputToFile>False</redirectTestOutputToFile>
                                 <!-- Can run only one test category per Maven 
run. -->
                                 
<groups>org.apache.druid.testsEx.categories.${it.category}</groups>
                              </configuration>
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java
new file mode 100644
index 0000000000..4bb1cdc478
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.google.inject.Inject;
+import org.apache.commons.io.IOUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.testing.utils.DataLoaderHelper;
+import org.apache.druid.testing.utils.MsqTestQueryHelper;
+import org.apache.druid.testing.utils.TestQueryHelper;
+import org.apache.druid.testsEx.indexer.AbstractITBatchIndexTest;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class AbstractITSQLBasedIngestion
+{
+  public static final Logger LOG = new Logger(TestQueryHelper.class);
+  @Inject
+  private MsqTestQueryHelper msqHelper;
+
+  @Inject
+  protected TestQueryHelper queryHelper;
+
+  @Inject
+  private DataLoaderHelper dataLoaderHelper;
+
+  /**
+   * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide 
datasource value.
+   */
+  protected String getStringFromFileAndReplaceDatasource(String filePath, 
String datasource)
+  {
+    String fileString;
+    try {
+      InputStream is = 
AbstractITBatchIndexTest.class.getResourceAsStream(filePath);
+      fileString = IOUtils.toString(is, StandardCharsets.UTF_8);
+    }
+    catch (IOException e) {
+      throw new ISE(e, "could not read query file: %s", filePath);
+    }
+
+    fileString = StringUtils.replace(
+        fileString,
+        "%%DATASOURCE%%",
+        datasource
+    );
+
+    return fileString;
+  }
+
+  /**
+   * Reads native queries from a file and runs against the provided datasource.
+   */
+  protected void doTestQuery(String queryFilePath, String dataSource)
+  {
+    try {
+      String query = getStringFromFileAndReplaceDatasource(queryFilePath, 
dataSource);
+      queryHelper.testQueriesFromString(query);
+    }
+    catch (Exception e) {
+      LOG.error(e, "Error while running test query");
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Sumits a sqlTask, waits for task completion and then runs test queries on 
ingested datasource.
+   */
+  protected void submitTaskAnddoTestQuery(String sqlTask, String 
queryFilePath, String datasource,
+                                          Map<String, Object> msqContext) 
throws Exception
+  {
+    LOG.info("SqlTask - \n %s", sqlTask);
+
+    // Submit the tasks and wait for the datasource to get loaded
+    msqHelper.submitMsqTaskAndWaitForCompletion(
+        sqlTask,
+        msqContext
+    );
+
+    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+    doTestQuery(queryFilePath, datasource);
+  }
+
+  /**
+   * Runs a MSQ ingest sql test.
+   *
+   * @param  sqlFilePath path of file containing the sql query.
+   * @param  queryFilePath path of file containing the native test queries to 
be run on the ingested datasource.
+   * @param  datasource name of the datasource. %%DATASOURCE%% in the sql and 
queries will be replaced with this value.
+   * @param  msqContext context parameters to be passed with MSQ API call.
+   */
+  protected void runMSQTaskandTestQueries(String sqlFilePath, String 
queryFilePath, String datasource,
+                                          Map<String, Object> msqContext) 
throws Exception
+  {
+    LOG.info("Starting MSQ test for [%s, %s]", sqlFilePath, queryFilePath);
+
+    String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, 
datasource);
+    submitTaskAnddoTestQuery(sqlTask, queryFilePath, datasource, msqContext);
+  }
+}
diff --git 
a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java
 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java
new file mode 100644
index 0000000000..dbc26d7d40
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import junitparams.Parameters;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.druid.testsEx.categories.MultiStageQuery;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(DruidTestRunner.class)
+@Category(MultiStageQuery.class)
+public class ITSQLBasedBatchIngestion extends AbstractITSQLBasedIngestion
+{
+  private static final String BATCH_INDEX_TASKS_DIR = 
"/multi-stage-query/batch-index/";
+
+  public static List<List<String>> test_cases()
+  {
+    return Arrays.asList(
+        Arrays.asList("msq_inline.sql", "json_path_index_queries.json"),
+        Arrays.asList("sparse_column_msq.sql", "sparse_column_msq.json"),
+        Arrays.asList("wikipedia_http_inputsource_msq.sql", 
"wikipedia_http_inputsource_queries.json"),
+        Arrays.asList("wikipedia_index_msq.sql", 
"wikipedia_index_queries.json"),
+        Arrays.asList("wikipedia_merge_index_task.sql", 
"wikipedia_index_queries.json"),
+        Arrays.asList("wikipedia_index_task_with_transform.sql", 
"wikipedia_index_queries_with_transform.json")
+    );
+
+  }
+
+  @Test
+  @Parameters(method = "test_cases")
+  public void testSQLBasedBatchIngestion(String sqlFileName, String 
queryFileName)
+  {
+    try {
+      runMSQTaskandTestQueries(BATCH_INDEX_TASKS_DIR + sqlFileName,
+                               BATCH_INDEX_TASKS_DIR + queryFileName,
+                               FilenameUtils.removeExtension(sqlFileName),
+                               ImmutableMap.of("finalizeAggregations", false,
+                                               "maxNumTasks", 5,
+                                               
"groupByEnableMultiValueUnnesting", false
+                               ));
+    }
+    catch (Exception e) {
+      LOG.error(e, "Error while testing [%s, %s]", sqlFileName, queryFileName);
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json
new file mode 100644
index 0000000000..845af00dd8
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json
@@ -0,0 +1,49 @@
+[
+  {
+    "description": "timeseries",
+    "query": {
+      "queryType": "timeseries",
+      "dataSource": "%%DATASOURCE%%",
+      "intervals": [
+        "1000/3000"
+      ],
+      "aggregations": [
+        {
+          "type": "longSum",
+          "name": "len",
+          "fieldName": "len"
+        },
+        {
+          "type": "longSum",
+          "name": "max",
+          "fieldName": "max"
+        },
+        {
+          "type": "longSum",
+          "name": "min",
+          "fieldName": "min"
+        },
+        {
+          "type": "longSum",
+          "name": "sum",
+          "fieldName": "sum"
+        }
+      ],
+      "granularity": {
+        "type": "all"
+      }
+    },
+    "expectedResults": [
+      {
+        "timestamp": "2013-08-31T01:02:33.000Z",
+        "result": {
+          "sum": 10,
+          "min": 0,
+          "len": 5,
+          "max": 4
+        }
+      }
+    ]
+  }
+]
+
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql
new file mode 100644
index 0000000000..a710691574
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql
@@ -0,0 +1,17 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+  EXTERN(
+    '{"type":"inline","data":"{\"timestamp\": \"2013-08-31T01:02:33Z\", 
\"values\": [0,1,2,3,4] }"}',
+    
'{"type":"json","flattenSpec":{"useFieldDiscovery":true,"fields":[{"type":"path","name":"len","expr":"$.values.length()"},{"type":"path","name":"min","expr":"$.values.min()"},{"type":"path","name":"max","expr":"$.values.max()"},{"type":"path","name":"sum","expr":"$.values.sum()"}]}}',
+    
'[{"name":"timestamp","type":"string"},{"name":"len","type":"long"},{"name":"min","type":"long"},{"name":"max","type":"long"},{"name":"sum","type":"long"}]'
+  )
+))
+SELECT
+  TIME_PARSE("timestamp") AS __time,
+  "len",
+  "min",
+  "max",
+  "sum"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5
+PARTITIONED BY HOUR
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json
new file mode 100644
index 0000000000..4c2c5aa295
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json
@@ -0,0 +1,93 @@
+[
+  {
+    "description": "timeseries, 1 agg, all",
+    "query":{
+      "queryType" : "timeBoundary",
+      "dataSource": "%%DATASOURCE%%"
+    },
+    "expectedResults":[
+      {
+        "timestamp" : "2015-09-12T00:00:00.000Z",
+        "result" : {
+          "minTime" : "2015-09-12T00:00:00.000Z",
+          "maxTime" : "2015-09-12T00:00:00.000Z"
+        }
+      }
+    ]
+  },
+  {
+    "description": "scan, all",
+    "query": {
+      "queryType": "scan",
+      "dataSource": "%%DATASOURCE%%",
+      "intervals": [
+        "2013-01-01/2020-01-02"
+      ],
+      "resultFormat":"compactedList"
+    },
+    "expectedResults": [{
+      
"segmentId":"dstsparse_column_msq.json_2015-09-12T00:00:00.000Z_2015-09-12T01:00:00.000Z_2022-11-17T12:32:11.247Z",
+      
"columns":["__time","dimB","dimA","dimC","dimD","dimE","dimF","count","sum_metA"],
+      "events":[
+        [1442016000000,"F","C",null,null,null,null,1,1],
+        [1442016000000,"J","C",null,null,null,null,1,1],
+        [1442016000000,"R","J",null,null,null,null,1,1],
+        [1442016000000,"S","Z",null,null,null,null,1,1],
+        [1442016000000,"T","H",null,null,null,null,1,1],
+        [1442016000000,"X",null,"A",null,null,null,1,1],
+        [1442016000000,"X","H",null,null,null,null,3,3],
+        [1442016000000,"Z","H",null,null,null,null,1,1]
+      ]
+    }],
+    "fieldsToTest": ["events"]
+  },
+  {
+    "description": "roll up ratio",
+    "query": {
+      "queryType":"timeseries",
+      "dataSource":{
+        "type":"table",
+        "name":"%%DATASOURCE%%"
+      },
+      "intervals":{
+        "type":"intervals",
+        "intervals":[
+          "2013-01-01/2020-01-02"
+        ]
+      },
+      "granularity":{
+        "type":"all"
+      },
+      "aggregations":[
+        {
+          "type":"count",
+          "name":"a0"
+        },
+        {
+          "type":"longSum",
+          "name":"a1",
+          "fieldName":"count",
+          "expression":null
+        }
+      ],
+      "postAggregations":[
+        {
+          "type":"expression",
+          "name":"p0",
+          "expression":"((\"a0\" * 1.00) / \"a1\")",
+          "ordering":null
+        }
+      ]
+    },
+    "expectedResults": [
+      {
+        "timestamp" : "2015-09-12T00:00:00.000Z",
+        "result" : {
+          "a1" : 10,
+          "p0" : 0.8,
+          "a0" : 8
+        }
+      }
+    ]
+  }
+]
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql
new file mode 100644
index 0000000000..f844f59964
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql
@@ -0,0 +1,21 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+  EXTERN(
+    
'{"type":"inline","data":"{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"F\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"J\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"Z\",\"dimB\":\"S\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\
 [...]
+    '{"type":"json"}',
+    
'[{"name":"time","type":"string"},{"name":"dimB","type":"string"},{"name":"dimA","type":"string"},{"name":"dimC","type":"string"},{"name":"dimD","type":"string"},{"name":"dimE","type":"string"},{"name":"dimF","type":"string"},{"name":"metA","type":"long"}]'
+  )
+))
+SELECT
+  TIME_FLOOR(TIME_PARSE("time"), 'PT1H') AS __time,
+  "dimB",
+  "dimA",
+  "dimC",
+  "dimD",
+  "dimE",
+  "dimF",
+  COUNT(*) AS "count",
+  SUM("metA") AS "sum_metA"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7
+PARTITIONED BY HOUR
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql
new file mode 100644
index 0000000000..f1af33bed4
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql
@@ -0,0 +1,29 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+  EXTERN(
+    
'{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz","https://druid.apache.org/data/wikipedia.json.gz"]}',
+    '{"type":"json"}',
+    
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double
 [...]
+  )
+))
+SELECT
+  TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN 
MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") 
END, 'PT1S') AS __time,
+  "page",
+  "language",
+  "user",
+  "unpatrolled",
+  "newPage",
+  "robot",
+  "anonymous",
+  "namespace",
+  "continent",
+  "country",
+  "region",
+  "city",
+  COUNT(*) AS "count",
+  SUM("added") AS "added",
+  SUM("deleted") AS "deleted",
+  SUM("delta") AS "delta"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13
+PARTITIONED BY DAY
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json
new file mode 100644
index 0000000000..2d454d59d8
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json
@@ -0,0 +1,47 @@
+[
+    {
+        "description": "timeseries, 1 agg, all",
+        "query":{
+            "queryType" : "timeBoundary",
+            "dataSource": "%%DATASOURCE%%"
+        },
+        "expectedResults":[
+            {
+                "timestamp" : "2016-06-27T00:00:11.000Z",
+                "result" : {
+                    "minTime" : "2016-06-27T00:00:11.000Z",
+                    "maxTime" : "2016-06-27T21:31:02.000Z"
+                }
+            }
+        ]
+    },
+    {
+        "description": "simple aggr",
+        "query":{
+            "queryType" : "topN",
+            "dataSource" : "%%DATASOURCE%%",
+            "intervals" : ["2016-06-27/2016-06-28"],
+            "granularity" : "all",
+            "dimension" : "page",
+            "metric" : "count",
+            "threshold" : 3,
+            "aggregations" : [
+                {
+                    "type" : "count",
+                    "name" : "count"
+                }
+            ]
+        },
+        "expectedResults":[
+            {
+                "timestamp" : "2016-06-27T00:00:11.000Z",
+                "result" :
+                    [
+                        {"count":29,"page":"Copa América Centenario"},
+                        {"count":16,"page":"User:Cyde/List of candidates for 
speedy deletion/Subpage"},
+                        {"count":16,"page":"Wikipedia:Administrators' 
noticeboard/Incidents"}
+                    ]
+            }
+        ]
+    }
+]
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql
new file mode 100644
index 0000000000..738e39fb87
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql
@@ -0,0 +1,32 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" as (SELECT * FROM TABLE(
+  EXTERN(
+    
'{"type":"local","files":["/resources/data/batch_index/json/wikipedia_index_data1.json","/resources/data/batch_index/json/wikipedia_index_data2.json","/resources/data/batch_index/json/wikipedia_index_data3.json"]}',
+    '{"type":"json"}',
+    
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double
 [...]
+  )
+))
+SELECT
+  TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN 
MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") 
END, 'PT1S') AS __time,
+  "page",
+  "language",
+  "user",
+  "unpatrolled",
+  "newPage",
+  "robot",
+  "anonymous",
+  "namespace",
+  "continent",
+  "country",
+  "region",
+  "city",
+  COUNT(*) AS "count",
+  SUM("added") AS "added",
+  SUM("deleted") AS "deleted",
+  SUM("delta") AS "delta",
+  APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
+  DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
+  APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13
+PARTITIONED BY DAY
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json
new file mode 100644
index 0000000000..928effe65e
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json
@@ -0,0 +1,150 @@
+[
+    {
+        "description": "timeseries, 1 agg, all",
+        "query":{
+            "queryType" : "timeBoundary",
+            "dataSource": "%%DATASOURCE%%"
+        },
+        "expectedResults":[
+            {
+                "timestamp" : "2013-08-31T01:02:33.000Z",
+                "result" : {
+                    "minTime" : "2013-08-31T01:02:33.000Z",
+                    "maxTime" : "2013-09-01T12:41:27.000Z"
+                }
+            }
+        ]
+    },
+    {
+        "description": "timeseries, datasketch aggs, all",
+        "query":{
+            "queryType" : "timeseries",
+            "dataSource": "%%DATASOURCE%%",
+            "granularity":"day",
+            "intervals":[
+                "2013-08-31T00:00/2013-09-01T00:00"
+            ],
+            "filter":null,
+            "aggregations":[
+                {
+                    "type": "HLLSketchMerge",
+                    "name": "approxCountHLL",
+                    "fieldName": "HLLSketchBuild",
+                    "lgK": 12,
+                    "tgtHllType": "HLL_4",
+                    "round": true
+                },
+                {
+                    "type":"thetaSketch",
+                    "name":"approxCountTheta",
+                    "fieldName":"thetaSketch",
+                    "size":16384,
+                    "shouldFinalize":true,
+                    "isInputThetaSketch":false,
+                    "errorBoundsStdDev":null
+                },
+                {
+                    "type":"quantilesDoublesSketch",
+                    "name":"quantilesSketch",
+                    "fieldName":"quantilesDoublesSketch",
+                    "k":128
+                }
+            ]
+        },
+        "expectedResults":[
+            {
+                "timestamp" : "2013-08-31T00:00:00.000Z",
+                "result" : {
+                    "quantilesSketch":5,
+                    "approxCountTheta":5.0,
+                    "approxCountHLL":5
+                }
+            }
+        ]
+    },
+    {
+        "description":"having spec on post aggregation",
+        "query":{
+            "queryType":"groupBy",
+            "dataSource":"%%DATASOURCE%%",
+            "granularity":"day",
+            "dimensions":[
+                "page"
+            ],
+            "filter":{
+                "type":"selector",
+                "dimension":"language",
+                "value":"zh"
+            },
+            "aggregations":[
+                {
+                    "type":"count",
+                    "name":"rows"
+                },
+                {
+                    "type":"longSum",
+                    "fieldName":"added",
+                    "name":"added_count"
+                }
+            ],
+            "postAggregations": [
+                {
+                    "type":"arithmetic",
+                    "name":"added_count_times_ten",
+                    "fn":"*",
+                    "fields":[
+                        {"type":"fieldAccess", "name":"added_count", 
"fieldName":"added_count"},
+                        {"type":"constant", "name":"const", "value":10}
+                    ]
+                }
+            ],
+            "having":{"type":"greaterThan", 
"aggregation":"added_count_times_ten", "value":9000},
+            "intervals":[
+                "2013-08-31T00:00/2013-09-01T00:00"
+            ]
+        },
+        "expectedResults":[ {
+            "version" : "v1",
+            "timestamp" : "2013-08-31T00:00:00.000Z",
+            "event" : {
+                "added_count_times_ten" : 9050.0,
+                "page" : "Crimson Typhoon",
+                "added_count" : 905,
+                "rows" : 1
+            }
+        } ]
+    },
+    {
+        "description": "timeseries, stringFirst/stringLast aggs, all",
+        "query":{
+            "queryType" : "timeseries",
+            "dataSource": "%%DATASOURCE%%",
+            "granularity":"day",
+            "intervals":[
+                "2013-08-31T00:00/2013-09-01T00:00"
+            ],
+            "filter":null,
+            "aggregations":[
+                {
+                    "type": "stringFirst",
+                    "name": "first_user",
+                    "fieldName": "user"
+                },
+                {
+                    "type":"stringLast",
+                    "name":"last_user",
+                    "fieldName":"user"
+                }
+            ]
+        },
+        "expectedResults":[
+            {
+                "timestamp" : "2013-08-31T00:00:00.000Z",
+                "result" : {
+                    "first_user":"nuclear",
+                    "last_user":"stringer"
+                }
+            }
+        ]
+    }
+]
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json
new file mode 100644
index 0000000000..f0cfba6773
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json
@@ -0,0 +1,62 @@
+[
+    {
+        "description":"having spec on post aggregation",
+        "query":{
+            "queryType":"groupBy",
+            "dataSource":"%%DATASOURCE%%",
+            "granularity":"day",
+            "dimensions":[
+                "page",
+                "city"
+            ],
+            "filter":{
+                "type":"selector",
+                "dimension":"language",
+                "value":"language-zh"
+            },
+            "aggregations":[
+                {
+                    "type":"count",
+                    "name":"rows"
+                },
+                {
+                    "type":"longSum",
+                    "fieldName":"triple-added",
+                    "name":"added_count"
+                },
+                {
+                    "type":"longSum",
+                    "fieldName":"delta",
+                    "name":"delta_sum"
+                }
+            ],
+            "postAggregations": [
+                {
+                    "type":"arithmetic",
+                    "name":"added_count_times_ten",
+                    "fn":"*",
+                    "fields":[
+                        {"type":"fieldAccess", "name":"added_count", 
"fieldName":"added_count"},
+                        {"type":"constant", "name":"const", "value":10}
+                    ]
+                }
+            ],
+            "having":{"type":"greaterThan", 
"aggregation":"added_count_times_ten", "value":9000},
+            "intervals":[
+                "2013-08-31T00:00/2013-09-01T00:00"
+            ]
+        },
+        "expectedResults":[ {
+            "version" : "v1",
+            "timestamp" : "2013-08-31T00:00:00.000Z",
+            "event" : {
+                "added_count_times_ten" : 27150.0,
+                "page" : "Crimson Typhoon",
+                "city" : "Taiyuan",
+                "added_count" : 2715,
+                "delta_sum" : 900,
+                "rows" : 1
+            }
+        } ]
+    }
+]
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql
new file mode 100644
index 0000000000..ebdeeda689
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql
@@ -0,0 +1,32 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+  EXTERN(
+    
'{"type":"local","baseDir":"/resources/data/batch_index/json","filter":"wikipedia_index_data*"}',
+    '{"type":"json"}',
+    
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"triple-added","type":"d
 [...]
+  )
+))
+SELECT
+  TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN 
MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") 
END, 'PT1S') AS __time,
+  "page",
+  concat('language-', "language") AS "language",
+  "user",
+  "unpatrolled",
+  "robot",
+  "anonymous",
+  "namespace",
+  "continent",
+  "country",
+  "region",
+  "city",
+  COUNT(*) AS "count",
+  SUM("added") AS "added",
+  SUM("added")*3 AS "triple-added",
+  SUM("deleted") AS "deleted",
+  SUM("delta") AS "delta",
+  APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
+  DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
+  APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12
+PARTITIONED BY DAY
\ No newline at end of file
diff --git 
a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql
 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql
new file mode 100644
index 0000000000..a8160aa905
--- /dev/null
+++ 
b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql
@@ -0,0 +1,33 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" AS (SELECT * FROM TABLE(
+  EXTERN(
+    
'{"type":"local","baseDir":"/resources/data/batch_index/json","filter":"wikipedia_index_data*"}',
+    '{"type":"json"}',
+    
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double
 [...]
+  )
+))
+SELECT
+  TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN 
MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") 
END, 'PT1S') AS __time,
+  "page",
+  "language",
+  "user",
+  "unpatrolled",
+  "newPage",
+  "robot",
+  "anonymous",
+  "namespace",
+  "continent",
+  "country",
+  "region",
+  "city",
+  "timestamp",
+  COUNT(*) AS "count",
+  SUM("added") AS "added",
+  SUM("deleted") AS "deleted",
+  SUM("delta") AS "delta",
+  APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
+  DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
+  APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14
+PARTITIONED BY DAY
\ No newline at end of file
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
index 424d070529..37647fde87 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java
@@ -43,6 +43,7 @@ import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.clients.SqlResourceTestClient;
 import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.testng.Assert;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -95,7 +96,15 @@ public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResu
    */
   public SqlTaskStatus submitMsqTask(String sqlQueryString) throws 
ExecutionException, InterruptedException
   {
-    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, 
false, ImmutableMap.of(), null));
+    return submitMsqTask(sqlQueryString, ImmutableMap.of());
+  }
+
+  /**
+   * Submits a task to the MSQ API with the given query string, and default 
headers and custom context parameters
+   */
+  public SqlTaskStatus submitMsqTask(String sqlQueryString, Map<String, 
Object> context) throws ExecutionException, InterruptedException
+  {
+    return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, 
false, context, null));
   }
 
   // Run the task, wait for it to complete, fetch the reports, verify the 
results,
@@ -154,6 +163,7 @@ public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResu
           throw new TaskStillRunningException();
         },
         (Throwable t) -> t instanceof TaskStillRunningException,
+        99,
         100
     );
   }
@@ -250,6 +260,25 @@ public class MsqTestQueryHelper extends 
AbstractTestQueryHelper<MsqQueryWithResu
     }
   }
 
+  /**
+   * Submits a {@link SqlQuery} to the MSQ API for execution. This method 
waits for the created task to be completed.
+   */
+  public void submitMsqTaskAndWaitForCompletion(String sqlQueryString, 
Map<String, Object> context)
+      throws Exception
+  {
+    SqlTaskStatus sqlTaskStatus = submitMsqTask(sqlQueryString, context);
+
+    LOG.info("Sql Task submitted with task Id - %s", 
sqlTaskStatus.getTaskId());
+
+    if (sqlTaskStatus.getState().isFailure()) {
+      Assert.fail(StringUtils.format(
+          "Unable to start the task successfully.\nPossible exception: %s",
+          sqlTaskStatus.getError()
+      ));
+    }
+    pollTaskIdForCompletion(sqlTaskStatus.getTaskId());
+  }
+
   private static class TaskStillRunningException extends Exception
   {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to