paul-rogers commented on code in PR #13535:
URL: https://github.com/apache/druid/pull/13535#discussion_r1047483660


##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITS3ToS3ParallelIndexTest.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import junitparams.Parameters;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.List;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must:
+ * 1) Set the bucket and path for your data. This can be done by setting 
-Ddruid.test.config.cloudBucket and
+ *    -Ddruid.test.config.cloudPath or setting "cloud_bucket" and "cloud_path" 
in the config file.
+ * 2) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and 
wikipedia_index_data3.json
+ *    located in integration-tests/src/test/resources/data/batch_index/json to 
your S3 at the location set in step 1.
+ * 3) Provide -Doverride.config.path=<PATH_TO_FILE> with s3 
credentials/configs set. See

Review Comment:
   I'm not sure if the new ITs implement this feature. If it did, we have no 
good way in the new ITs to pass this information into the test, as we use the 
same Java command line for all tests. In the new IT framework, we prefer to use 
env vars, not command line options.
   
   I wonder, are all these comments just copy/pasted from old tests without 
adjusting them for the new IT framework? If so, we're doing future readers a 
dis-service by providing incorrect info. If we don't have time to provide 
correct comments, just remove the comments and let the developer rediscover how 
things work without being sent down the wrong path.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITS3ToS3ParallelIndexTest.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import junitparams.Parameters;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.List;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must:
+ * 1) Set the bucket and path for your data. This can be done by setting 
-Ddruid.test.config.cloudBucket and

Review Comment:
   Is this comment accurate? We discussed setting the env var directly. For the 
above to work, something would have to pass the flags to the test process, but 
the new ITs are not set up to do that.



##########
integration-tests-ex/cases/cluster/S3DeepStorage/docker-compose.yaml:
##########
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+networks:
+  druid-it-net:
+    name: druid-it-net
+    ipam:
+      config:
+        - subnet: 172.172.172.0/24
+
+services:
+  zookeeper:
+    extends:
+      file: ../Common/dependencies.yaml
+      service: zookeeper
+
+  metadata:
+    extends:
+      file: ../Common/dependencies.yaml
+      service: metadata
+
+  coordinator:
+    extends:
+      file: ../Common/druid.yaml
+      service: coordinator
+    container_name: coordinator
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+      # The frequency with which the coordinator polls the database
+      # for changes. The DB population code has to wait at least this
+      # long for the coordinator to notice changes.
+      - druid_manager_segments_pollDuration=PT5S
+      - druid_coordinator_period=PT10S
+      - AWS_REGION=${AWS_REGION}
+      - druid_s3_accessKey=${AWS_ACCESS_KEY_ID}
+      - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY}
+    depends_on:
+      - zookeeper
+      - metadata
+
+  overlord:
+    extends:
+      file: ../Common/druid.yaml
+      service: overlord
+    container_name: overlord
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+      - AWS_REGION=${AWS_REGION}
+      - druid_s3_accessKey=${AWS_ACCESS_KEY_ID}
+      - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY}
+    depends_on:
+      - zookeeper
+      - metadata
+
+  broker:
+    extends:
+      file: ../Common/druid.yaml
+      service: broker
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+      - AWS_REGION=${AWS_REGION}
+      - druid_s3_accessKey=${AWS_ACCESS_KEY_ID}
+      - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY}
+    depends_on:
+      - zookeeper
+
+  router:
+    extends:
+      file: ../Common/druid.yaml
+      service: router
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+      - AWS_REGION=${AWS_REGION}
+      - druid_s3_accessKey=${AWS_ACCESS_KEY_ID}
+      - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY}
+    depends_on:
+      - zookeeper
+
+  historical:
+    extends:
+      file: ../Common/druid.yaml
+      service: historical
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+      - AWS_REGION=${AWS_REGION}
+      - druid_s3_accessKey=${AWS_ACCESS_KEY_ID}
+      - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY}
+    depends_on:
+      - zookeeper
+
+  indexer:
+    extends:
+      file: ../Common/druid.yaml
+      service: indexer
+    environment:
+      - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
+      - druid_test_loadList=druid-s3-extensions
+      - druid_storage_type=s3
+      - druid_storage_bucket=${DRUID_CLOUD_BUCKET}
+      # Using DRUID_CLOUD_PATH env as baseKey as well.
+      - druid_storage_baseKey=${DRUID_CLOUD_PATH}
+      - druid_s3_accessKey=${AWS_ACCESS_KEY_ID}
+      - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY}

Review Comment:
   Let's document the required env vars somewhere. Perhaps we add a Markdown 
page that lists the requirements for each test. Or, we document these at the 
top of the Java file. Be sure to list the env vars used by the server vs. those 
used by the test.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITS3ToS3ParallelIndexTest.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import junitparams.Parameters;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.List;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must:
+ * 1) Set the bucket and path for your data. This can be done by setting 
-Ddruid.test.config.cloudBucket and
+ *    -Ddruid.test.config.cloudPath or setting "cloud_bucket" and "cloud_path" 
in the config file.
+ * 2) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and 
wikipedia_index_data3.json
+ *    located in integration-tests/src/test/resources/data/batch_index/json to 
your S3 at the location set in step 1.

Review Comment:
   A comment somewhere said that the test itself is doing this step. If this 
test were run as part of a build, we'd have to insert this step into the script 
which launches the test. Did we do so?



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY, AWS_REGION
+ */
+
+@RunWith(DruidTestRunner.class)
+@Category(S3DeepStorage.class)
+public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion
+{
+  @Inject
+  @Json
+  protected ObjectMapper jsonMapper;
+  private static AmazonS3 s3Client = s3Client();
+  private static String datasource = "wikipedia_cloud_index_msq";
+  private static final String CLOUD_INGEST_SQL = 
"/multi-stage-query/wikipedia_cloud_index_msq.sql";
+  private static final String INDEX_QUERIES_FILE = 
"/multi-stage-query/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] test_cases()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static AmazonS3 s3Client()
+  {
+    AWSCredentials credentials = new BasicAWSCredentials(
+        System.getenv("AWS_ACCESS_KEY_ID"),
+        System.getenv("AWS_SECRET_ACCESS_KEY")

Review Comment:
   A common error will be to leave one or both of these unset. Even though this 
is a test, and we want to go fast, future users will appreciate it if we check 
if these values are set and give a good error if not set.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY, AWS_REGION
+ */
+
+@RunWith(DruidTestRunner.class)
+@Category(S3DeepStorage.class)
+public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion
+{
+  @Inject
+  @Json
+  protected ObjectMapper jsonMapper;
+  private static AmazonS3 s3Client = s3Client();
+  private static String datasource = "wikipedia_cloud_index_msq";
+  private static final String CLOUD_INGEST_SQL = 
"/multi-stage-query/wikipedia_cloud_index_msq.sql";
+  private static final String INDEX_QUERIES_FILE = 
"/multi-stage-query/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] test_cases()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static AmazonS3 s3Client()
+  {
+    AWSCredentials credentials = new BasicAWSCredentials(
+        System.getenv("AWS_ACCESS_KEY_ID"),
+        System.getenv("AWS_SECRET_ACCESS_KEY")
+    );
+    return AmazonS3ClientBuilder
+        .standard()
+        .withCredentials(new AWSStaticCredentialsProvider(credentials))
+        .withRegion(System.getenv("AWS_REGION"))

Review Comment:
   Again, check the value and given an error.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY, AWS_REGION
+ */
+
+@RunWith(DruidTestRunner.class)
+@Category(S3DeepStorage.class)
+public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion
+{
+  @Inject
+  @Json
+  protected ObjectMapper jsonMapper;
+  private static AmazonS3 s3Client = s3Client();
+  private static String datasource = "wikipedia_cloud_index_msq";
+  private static final String CLOUD_INGEST_SQL = 
"/multi-stage-query/wikipedia_cloud_index_msq.sql";
+  private static final String INDEX_QUERIES_FILE = 
"/multi-stage-query/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] test_cases()

Review Comment:
   This seems a clone of the `resources` in 
`AbstractS3InputSourceParallelIndexTest`. Even though we want to move fast, it 
is still helpful to follow good practices and factor out common code.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY, AWS_REGION
+ */
+
+@RunWith(DruidTestRunner.class)
+@Category(S3DeepStorage.class)
+public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion
+{
+  @Inject
+  @Json
+  protected ObjectMapper jsonMapper;
+  private static AmazonS3 s3Client = s3Client();
+  private static String datasource = "wikipedia_cloud_index_msq";
+  private static final String CLOUD_INGEST_SQL = 
"/multi-stage-query/wikipedia_cloud_index_msq.sql";
+  private static final String INDEX_QUERIES_FILE = 
"/multi-stage-query/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] test_cases()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static AmazonS3 s3Client()
+  {
+    AWSCredentials credentials = new BasicAWSCredentials(
+        System.getenv("AWS_ACCESS_KEY_ID"),
+        System.getenv("AWS_SECRET_ACCESS_KEY")
+    );
+    return AmazonS3ClientBuilder
+        .standard()
+        .withCredentials(new AWSStaticCredentialsProvider(credentials))
+        .withRegion(System.getenv("AWS_REGION"))
+        .build();
+  }
+
+  public static String[] fileList()
+  {
+    return new String[] {
+        WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3
+    };
+  }
+
+  @BeforeClass
+  public static void uploadDataFilesToS3()
+  {
+    String localPath = "resources/data/batch_index/json/";
+    for (String file : fileList()) {
+      s3Client.putObject(
+          System.getenv("DRUID_CLOUD_BUCKET"),
+          System.getenv("DRUID_CLOUD_PATH") + "/" + file,
+          new File(localPath + file)
+      );
+    }
+  }
+
+  @AfterClass
+  public static void deleteFilesFromS3()
+  {
+    // Delete uploaded data files
+    DeleteObjectsRequest delObjReq = new 
DeleteObjectsRequest(System.getenv("DRUID_CLOUD_BUCKET"))
+        .withKeys(fileList());
+    s3Client.deleteObjects(delObjReq);
+
+    // Delete segments created by druid
+    ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
+        .withBucketName(System.getenv("DRUID_CLOUD_BUCKET"))
+        .withPrefix(System.getenv("DRUID_CLOUD_PATH") + "/" + datasource + 
"/");

Review Comment:
   See comments above.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY, AWS_REGION
+ */
+
+@RunWith(DruidTestRunner.class)
+@Category(S3DeepStorage.class)
+public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion

Review Comment:
   Some explanation would be helpful. I see code to upload and delete files. 
There seems to be a step to run an MSQ query. Not sure if there is a step to 
validate the results. Please add a note to explain what this test does. I, as a 
reviewer, don't want to have to reverse-engineer the intent from the code.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY, AWS_REGION
+ */
+
+@RunWith(DruidTestRunner.class)
+@Category(S3DeepStorage.class)
+public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion
+{
+  @Inject
+  @Json
+  protected ObjectMapper jsonMapper;
+  private static AmazonS3 s3Client = s3Client();
+  private static String datasource = "wikipedia_cloud_index_msq";
+  private static final String CLOUD_INGEST_SQL = 
"/multi-stage-query/wikipedia_cloud_index_msq.sql";
+  private static final String INDEX_QUERIES_FILE = 
"/multi-stage-query/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] test_cases()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static AmazonS3 s3Client()
+  {
+    AWSCredentials credentials = new BasicAWSCredentials(
+        System.getenv("AWS_ACCESS_KEY_ID"),
+        System.getenv("AWS_SECRET_ACCESS_KEY")
+    );
+    return AmazonS3ClientBuilder
+        .standard()
+        .withCredentials(new AWSStaticCredentialsProvider(credentials))
+        .withRegion(System.getenv("AWS_REGION"))
+        .build();
+  }
+
+  public static String[] fileList()
+  {
+    return new String[] {
+        WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3
+    };
+  }
+
+  @BeforeClass
+  public static void uploadDataFilesToS3()
+  {
+    String localPath = "resources/data/batch_index/json/";
+    for (String file : fileList()) {
+      s3Client.putObject(
+          System.getenv("DRUID_CLOUD_BUCKET"),
+          System.getenv("DRUID_CLOUD_PATH") + "/" + file,
+          new File(localPath + file)
+      );
+    }
+  }
+
+  @AfterClass
+  public static void deleteFilesFromS3()
+  {
+    // Delete uploaded data files
+    DeleteObjectsRequest delObjReq = new 
DeleteObjectsRequest(System.getenv("DRUID_CLOUD_BUCKET"))
+        .withKeys(fileList());
+    s3Client.deleteObjects(delObjReq);
+
+    // Delete segments created by druid
+    ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
+        .withBucketName(System.getenv("DRUID_CLOUD_BUCKET"))
+        .withPrefix(System.getenv("DRUID_CLOUD_PATH") + "/" + datasource + 
"/");
+
+    ObjectListing objectListing = s3Client.listObjects(listObjectsRequest);
+
+    while (true) {
+      for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) 
{
+        s3Client.deleteObject(System.getenv("DRUID_CLOUD_BUCKET"), 
objectSummary.getKey());
+      }
+      if (objectListing.isTruncated()) {
+        objectListing = s3Client.listNextBatchOfObjects(objectListing);
+      } else {
+        break;
+      }
+    }
+  }
+
+  @Test
+  @Parameters(method = "test_cases")
+  @TestCaseName("Test_{index} ({0})")
+  public void testSQLBasedBatchIngestion(Pair<String, List> s3InputSource)
+  {
+    try {
+      String sqlTask = getStringFromFileAndReplaceDatasource(CLOUD_INGEST_SQL, 
datasource);
+      String inputSourceValue = 
jsonMapper.writeValueAsString(s3InputSource.rhs);
+      Map<String, Object> context = ImmutableMap.of("finalizeAggregations", 
false,
+                                                    "maxNumTasks", 5,
+                                                    
"groupByEnableMultiValueUnnesting", false);
+
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%INPUT_SOURCE_PROPERTY_KEY%%",
+          s3InputSource.lhs
+      );
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%INPUT_SOURCE_PROPERTY_VALUE%%",
+          inputSourceValue
+      );
+
+      // Setting the correct object path in the sqlTask.
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%BUCKET%%",
+          config.getCloudBucket() // Getting from DRUID_CLOUD_BUCKET env 
variable
+      );
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%PATH%%",
+          config.getCloudPath() // Getting from DRUID_CLOUD_PATH env variable
+      );
+
+      submitTask(sqlTask, datasource, context);
+      doTestQuery(INDEX_QUERIES_FILE, datasource);

Review Comment:
   Does this verify the results by running queries against the uploaded data? 
If so, please add a comment to note this.
   
   If this does not verify results, then we need to add a step to do so. Else, 
this is not much of a test since a simple direct return from `submitTask()` 
(while doing nothing) would count as a success.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY, AWS_REGION
+ */
+
+@RunWith(DruidTestRunner.class)
+@Category(S3DeepStorage.class)
+public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion
+{
+  @Inject
+  @Json
+  protected ObjectMapper jsonMapper;
+  private static AmazonS3 s3Client = s3Client();
+  private static String datasource = "wikipedia_cloud_index_msq";
+  private static final String CLOUD_INGEST_SQL = 
"/multi-stage-query/wikipedia_cloud_index_msq.sql";
+  private static final String INDEX_QUERIES_FILE = 
"/multi-stage-query/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] test_cases()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static AmazonS3 s3Client()
+  {
+    AWSCredentials credentials = new BasicAWSCredentials(
+        System.getenv("AWS_ACCESS_KEY_ID"),
+        System.getenv("AWS_SECRET_ACCESS_KEY")
+    );
+    return AmazonS3ClientBuilder
+        .standard()
+        .withCredentials(new AWSStaticCredentialsProvider(credentials))
+        .withRegion(System.getenv("AWS_REGION"))
+        .build();
+  }
+
+  public static String[] fileList()
+  {
+    return new String[] {
+        WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3
+    };
+  }
+
+  @BeforeClass
+  public static void uploadDataFilesToS3()

Review Comment:
   This function has no try-catch. If one upload fails, we don't delete the 
files already uploaded. This will cause the next test to fail. It is helpful to 
not only get code done quickly, but to also get it done well to prevent 
cascading errors.
   
   So, please provide try/catch blocks that ensure that files are cleaned up 
even if something fails.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY, AWS_REGION
+ */
+
+@RunWith(DruidTestRunner.class)
+@Category(S3DeepStorage.class)
+public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion
+{
+  @Inject
+  @Json
+  protected ObjectMapper jsonMapper;
+  private static AmazonS3 s3Client = s3Client();
+  private static String datasource = "wikipedia_cloud_index_msq";
+  private static final String CLOUD_INGEST_SQL = 
"/multi-stage-query/wikipedia_cloud_index_msq.sql";
+  private static final String INDEX_QUERIES_FILE = 
"/multi-stage-query/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] test_cases()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static AmazonS3 s3Client()
+  {
+    AWSCredentials credentials = new BasicAWSCredentials(
+        System.getenv("AWS_ACCESS_KEY_ID"),
+        System.getenv("AWS_SECRET_ACCESS_KEY")
+    );
+    return AmazonS3ClientBuilder
+        .standard()
+        .withCredentials(new AWSStaticCredentialsProvider(credentials))
+        .withRegion(System.getenv("AWS_REGION"))
+        .build();
+  }
+
+  public static String[] fileList()
+  {
+    return new String[] {
+        WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3
+    };
+  }
+
+  @BeforeClass
+  public static void uploadDataFilesToS3()
+  {
+    String localPath = "resources/data/batch_index/json/";
+    for (String file : fileList()) {
+      s3Client.putObject(
+          System.getenv("DRUID_CLOUD_BUCKET"),
+          System.getenv("DRUID_CLOUD_PATH") + "/" + file,

Review Comment:
   Please check that the env var is set. This code appears to be a quick hack 
to get things done ASAP, but without normal coding practices. This code 
bypasses the mechanism that exists for this Druid-specific value. It does not 
check if the value is set. If fetches the env var for every object rather than 
fetching once and reusing the value (which is what the existing code does.)
   
   I'm hesitant to approve code that is overly quick and dirty.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY, AWS_REGION
+ */
+
+@RunWith(DruidTestRunner.class)
+@Category(S3DeepStorage.class)
+public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion
+{
+  @Inject
+  @Json
+  protected ObjectMapper jsonMapper;
+  private static AmazonS3 s3Client = s3Client();
+  private static String datasource = "wikipedia_cloud_index_msq";
+  private static final String CLOUD_INGEST_SQL = 
"/multi-stage-query/wikipedia_cloud_index_msq.sql";
+  private static final String INDEX_QUERIES_FILE = 
"/multi-stage-query/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] test_cases()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static AmazonS3 s3Client()
+  {
+    AWSCredentials credentials = new BasicAWSCredentials(
+        System.getenv("AWS_ACCESS_KEY_ID"),
+        System.getenv("AWS_SECRET_ACCESS_KEY")
+    );
+    return AmazonS3ClientBuilder
+        .standard()
+        .withCredentials(new AWSStaticCredentialsProvider(credentials))
+        .withRegion(System.getenv("AWS_REGION"))
+        .build();
+  }
+
+  public static String[] fileList()
+  {
+    return new String[] {
+        WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3
+    };
+  }
+
+  @BeforeClass
+  public static void uploadDataFilesToS3()
+  {
+    String localPath = "resources/data/batch_index/json/";
+    for (String file : fileList()) {
+      s3Client.putObject(
+          System.getenv("DRUID_CLOUD_BUCKET"),
+          System.getenv("DRUID_CLOUD_PATH") + "/" + file,
+          new File(localPath + file)
+      );
+    }
+  }
+
+  @AfterClass
+  public static void deleteFilesFromS3()
+  {
+    // Delete uploaded data files
+    DeleteObjectsRequest delObjReq = new 
DeleteObjectsRequest(System.getenv("DRUID_CLOUD_BUCKET"))
+        .withKeys(fileList());
+    s3Client.deleteObjects(delObjReq);
+
+    // Delete segments created by druid
+    ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
+        .withBucketName(System.getenv("DRUID_CLOUD_BUCKET"))
+        .withPrefix(System.getenv("DRUID_CLOUD_PATH") + "/" + datasource + 
"/");
+
+    ObjectListing objectListing = s3Client.listObjects(listObjectsRequest);
+
+    while (true) {
+      for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) 
{
+        s3Client.deleteObject(System.getenv("DRUID_CLOUD_BUCKET"), 
objectSummary.getKey());
+      }
+      if (objectListing.isTruncated()) {
+        objectListing = s3Client.listNextBatchOfObjects(objectListing);
+      } else {
+        break;
+      }
+    }
+  }
+
+  @Test
+  @Parameters(method = "test_cases")
+  @TestCaseName("Test_{index} ({0})")
+  public void testSQLBasedBatchIngestion(Pair<String, List> s3InputSource)
+  {
+    try {
+      String sqlTask = getStringFromFileAndReplaceDatasource(CLOUD_INGEST_SQL, 
datasource);
+      String inputSourceValue = 
jsonMapper.writeValueAsString(s3InputSource.rhs);
+      Map<String, Object> context = ImmutableMap.of("finalizeAggregations", 
false,
+                                                    "maxNumTasks", 5,
+                                                    
"groupByEnableMultiValueUnnesting", false);
+
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%INPUT_SOURCE_PROPERTY_KEY%%",
+          s3InputSource.lhs
+      );
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%INPUT_SOURCE_PROPERTY_VALUE%%",
+          inputSourceValue
+      );
+
+      // Setting the correct object path in the sqlTask.
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%BUCKET%%",
+          config.getCloudBucket() // Getting from DRUID_CLOUD_BUCKET env 
variable
+      );
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%PATH%%",
+          config.getCloudPath() // Getting from DRUID_CLOUD_PATH env variable
+      );
+
+      submitTask(sqlTask, datasource, context);

Review Comment:
   Does this submit the MSQ ingestion task? If so, please add a comment to note 
this.
   
   For MSQ, we have to submit a task to a specific endpoint, poll to wait for 
completion, and check completion status. Do we do that somewhere?



##########
integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_cloud_index_msq.sql:
##########
@@ -0,0 +1,32 @@
+REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL
+WITH "source" as (SELECT * FROM TABLE(
+  EXTERN(
+    
'{"type":"s3","%%INPUT_SOURCE_PROPERTY_KEY%%":%%INPUT_SOURCE_PROPERTY_VALUE%%}',
+    '{"type":"json"}',
+    
'[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]'
+  )
+))
+SELECT
+  TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN 
MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") 
END, 'PT1S') AS __time,
+  "page",
+  "language",
+  "user",
+  "unpatrolled",
+  "newPage",
+  "robot",
+  "anonymous",
+  "namespace",
+  "continent",
+  "country",
+  "region",
+  "city",
+  COUNT(*) AS "count",
+  SUM("added") AS "added",
+  SUM("deleted") AS "deleted",
+  SUM("delta") AS "delta",
+  APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch",
+  DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch",
+  APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild"
+FROM "source"
+GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13
+PARTITIONED BY DAY

Review Comment:
   Missing newline



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY, AWS_REGION

Review Comment:
   Please explain what these should be set to.
   
   `DRUID_CLOUD_BUCKET`: The full URL or just a name?
   
   `DRUID_CLOUD_PATH`: Same question. What is the relationship between this 
item and the above.
   
   `AWS_*` these are probably standard AWS settings?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to