paul-rogers commented on code in PR #13535:
URL: https://github.com/apache/druid/pull/13535#discussion_r1052462276


##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java:
##########
@@ -91,16 +97,149 @@ public String getFolderSuffix()
     }
   }
 
-  private static final Logger LOG = new Logger(AbstractITBatchIndexTest.class);
+  public static final Logger LOG = new Logger(AbstractITBatchIndexTest.class);
 
-  @Inject
-  protected IntegrationTestingConfig config;
   @Inject
   protected SqlTestQueryHelper sqlQueryHelper;
 
   @Inject
   ClientInfoResourceTestClient clientInfoResourceTestClient;
 
+  @Inject
+  private MsqTestQueryHelper msqHelper;
+
+  @Inject
+  protected TestQueryHelper queryHelper;
+
+  @Inject
+  private DataLoaderHelper dataLoaderHelper;
+
+  @Rule
+  public TestWatcher watchman = new TestWatcher()
+  {
+    @Override
+    public void starting(Description d)
+    {
+      LOG.info("RUNNING %s", d.getDisplayName());
+    }
+
+    @Override
+    public void failed(Throwable e, Description d)
+    {
+      LOG.error("FAILED %s", d.getDisplayName());
+    }
+
+    @Override
+    public void finished(Description d)
+    {
+      LOG.info("FINISHED %s", d.getDisplayName());
+    }
+  };
+
+  /**
+   * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide 
datasource value.
+   */
+  protected String getStringFromFileAndReplaceDatasource(String filePath, 
String datasource)
+  {
+    String fileString;
+    try {
+      InputStream is = 
AbstractITBatchIndexTest.class.getResourceAsStream(filePath);
+      fileString = IOUtils.toString(is, StandardCharsets.UTF_8);
+    }
+    catch (IOException e) {
+      throw new ISE(e, "could not read query file: %s", filePath);
+    }
+
+    fileString = StringUtils.replace(
+        fileString,
+        "%%DATASOURCE%%",
+        datasource
+    );
+
+    return fileString;
+  }
+
+  /**
+   * Reads native queries from a file and runs against the provided datasource.
+   */
+  protected void doTestQuery(String dataSource, String queryFilePath)
+  {
+    try {
+      String query = getStringFromFileAndReplaceDatasource(queryFilePath, 
dataSource);
+      queryHelper.testQueriesFromString(query);
+    }
+    catch (Exception e) {
+      LOG.error(e, "Error while running test query at path " + queryFilePath);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Sumits a sqlTask, waits for task completion.
+   */
+  protected void submitMSQTask(String sqlTask, String datasource, Map<String, 
Object> msqContext) throws Exception
+  {
+    LOG.info("SqlTask - \n %s", sqlTask);
+
+    // Submit the tasks and wait for the datasource to get loaded
+    msqHelper.submitMsqTaskAndWaitForCompletion(
+        sqlTask,
+        msqContext
+    );
+
+    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+  }
+
+  /**
+   * Sumits a sqlTask, waits for task completion.
+   */
+  protected void submitMSQTaskFromFile(String sqlFilePath, String datasource, 
Map<String, Object> msqContext) throws Exception
+  {
+    String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, 
datasource);
+    submitMSQTask(sqlTask, datasource, msqContext);
+  }
+
+  /**
+   * Runs a SQL ingest test.
+   *
+   * @param  sqlFilePath path of file containing the sql query.
+   * @param  queryFilePath path of file containing the native test queries to 
be run on the ingested datasource.
+   * @param  datasource name of the datasource. %%DATASOURCE%% in the sql and 
queries will be replaced with this value.
+   * @param  msqContext context parameters to be passed with MSQ API call.
+   */
+  protected void runMSQTaskandTestQueries(String sqlFilePath,
+                                          String queryFilePath,
+                                          String datasource,
+                                          Map<String, Object> msqContext) 
throws Exception

Review Comment:
   Nit: I think Druid prefers the following formatting:
   
   ```java
     protected void runMSQTaskandTestQueries(
          String sqlFilePath,
          String queryFilePath,
          String datasource,
          Map<String, Object> msqContext
     ) throws Exception
   ```
   
   At least, I always get comments if I forget to format things the above way. 
The commenter notes "we haven't figured out how to get Checkstyle to enforce 
this style, but it what we want."



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java:
##########
@@ -91,16 +97,149 @@ public String getFolderSuffix()
     }
   }
 
-  private static final Logger LOG = new Logger(AbstractITBatchIndexTest.class);
+  public static final Logger LOG = new Logger(AbstractITBatchIndexTest.class);
 
-  @Inject
-  protected IntegrationTestingConfig config;
   @Inject
   protected SqlTestQueryHelper sqlQueryHelper;
 
   @Inject
   ClientInfoResourceTestClient clientInfoResourceTestClient;
 
+  @Inject
+  private MsqTestQueryHelper msqHelper;
+
+  @Inject
+  protected TestQueryHelper queryHelper;
+
+  @Inject
+  private DataLoaderHelper dataLoaderHelper;
+
+  @Rule
+  public TestWatcher watchman = new TestWatcher()
+  {
+    @Override
+    public void starting(Description d)
+    {
+      LOG.info("RUNNING %s", d.getDisplayName());
+    }
+
+    @Override
+    public void failed(Throwable e, Description d)
+    {
+      LOG.error("FAILED %s", d.getDisplayName());
+    }
+
+    @Override
+    public void finished(Description d)
+    {
+      LOG.info("FINISHED %s", d.getDisplayName());
+    }
+  };
+
+  /**
+   * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide 
datasource value.
+   */
+  protected String getStringFromFileAndReplaceDatasource(String filePath, 
String datasource)
+  {
+    String fileString;
+    try {
+      InputStream is = 
AbstractITBatchIndexTest.class.getResourceAsStream(filePath);
+      fileString = IOUtils.toString(is, StandardCharsets.UTF_8);
+    }
+    catch (IOException e) {
+      throw new ISE(e, "could not read query file: %s", filePath);
+    }
+
+    fileString = StringUtils.replace(
+        fileString,
+        "%%DATASOURCE%%",
+        datasource
+    );
+
+    return fileString;
+  }
+
+  /**
+   * Reads native queries from a file and runs against the provided datasource.
+   */
+  protected void doTestQuery(String dataSource, String queryFilePath)
+  {
+    try {
+      String query = getStringFromFileAndReplaceDatasource(queryFilePath, 
dataSource);
+      queryHelper.testQueriesFromString(query);
+    }
+    catch (Exception e) {
+      LOG.error(e, "Error while running test query at path " + queryFilePath);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Sumits a sqlTask, waits for task completion.
+   */
+  protected void submitMSQTask(String sqlTask, String datasource, Map<String, 
Object> msqContext) throws Exception
+  {
+    LOG.info("SqlTask - \n %s", sqlTask);
+
+    // Submit the tasks and wait for the datasource to get loaded
+    msqHelper.submitMsqTaskAndWaitForCompletion(
+        sqlTask,
+        msqContext
+    );
+
+    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+  }
+
+  /**
+   * Sumits a sqlTask, waits for task completion.

Review Comment:
   Sumits -> Submits



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java:
##########
@@ -91,16 +97,149 @@ public String getFolderSuffix()
     }
   }
 
-  private static final Logger LOG = new Logger(AbstractITBatchIndexTest.class);
+  public static final Logger LOG = new Logger(AbstractITBatchIndexTest.class);
 
-  @Inject
-  protected IntegrationTestingConfig config;
   @Inject
   protected SqlTestQueryHelper sqlQueryHelper;
 
   @Inject
   ClientInfoResourceTestClient clientInfoResourceTestClient;
 
+  @Inject
+  private MsqTestQueryHelper msqHelper;
+
+  @Inject
+  protected TestQueryHelper queryHelper;
+
+  @Inject
+  private DataLoaderHelper dataLoaderHelper;
+
+  @Rule
+  public TestWatcher watchman = new TestWatcher()
+  {
+    @Override
+    public void starting(Description d)
+    {
+      LOG.info("RUNNING %s", d.getDisplayName());
+    }
+
+    @Override
+    public void failed(Throwable e, Description d)
+    {
+      LOG.error("FAILED %s", d.getDisplayName());
+    }
+
+    @Override
+    public void finished(Description d)
+    {
+      LOG.info("FINISHED %s", d.getDisplayName());
+    }
+  };
+
+  /**
+   * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide 
datasource value.
+   */
+  protected String getStringFromFileAndReplaceDatasource(String filePath, 
String datasource)
+  {
+    String fileString;
+    try {
+      InputStream is = 
AbstractITBatchIndexTest.class.getResourceAsStream(filePath);
+      fileString = IOUtils.toString(is, StandardCharsets.UTF_8);
+    }
+    catch (IOException e) {
+      throw new ISE(e, "could not read query file: %s", filePath);
+    }
+
+    fileString = StringUtils.replace(
+        fileString,
+        "%%DATASOURCE%%",
+        datasource
+    );
+
+    return fileString;
+  }
+
+  /**
+   * Reads native queries from a file and runs against the provided datasource.
+   */
+  protected void doTestQuery(String dataSource, String queryFilePath)
+  {
+    try {
+      String query = getStringFromFileAndReplaceDatasource(queryFilePath, 
dataSource);
+      queryHelper.testQueriesFromString(query);
+    }
+    catch (Exception e) {
+      LOG.error(e, "Error while running test query at path " + queryFilePath);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Sumits a sqlTask, waits for task completion.
+   */
+  protected void submitMSQTask(String sqlTask, String datasource, Map<String, 
Object> msqContext) throws Exception
+  {
+    LOG.info("SqlTask - \n %s", sqlTask);
+
+    // Submit the tasks and wait for the datasource to get loaded
+    msqHelper.submitMsqTaskAndWaitForCompletion(
+        sqlTask,
+        msqContext
+    );
+
+    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+  }
+
+  /**
+   * Sumits a sqlTask, waits for task completion.
+   */
+  protected void submitMSQTaskFromFile(String sqlFilePath, String datasource, 
Map<String, Object> msqContext) throws Exception
+  {
+    String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, 
datasource);
+    submitMSQTask(sqlTask, datasource, msqContext);
+  }
+
+  /**
+   * Runs a SQL ingest test.
+   *
+   * @param  sqlFilePath path of file containing the sql query.
+   * @param  queryFilePath path of file containing the native test queries to 
be run on the ingested datasource.
+   * @param  datasource name of the datasource. %%DATASOURCE%% in the sql and 
queries will be replaced with this value.
+   * @param  msqContext context parameters to be passed with MSQ API call.
+   */
+  protected void runMSQTaskandTestQueries(String sqlFilePath,
+                                          String queryFilePath,
+                                          String datasource,
+                                          Map<String, Object> msqContext) 
throws Exception
+  {
+    LOG.info("Starting MSQ test for [%s, %s]", sqlFilePath, queryFilePath);

Review Comment:
   Nit: the `[ ]` thing we use was probably originally for cases where the 
value is unformatted. Here, it would be cleaner to say:
   
   ```java
   LOG.info("Starting MSQ test for sql path: %s, query path: %s", sqlFilePath, 
queryFilePath);
   ```



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java:
##########
@@ -91,16 +97,149 @@ public String getFolderSuffix()
     }
   }
 
-  private static final Logger LOG = new Logger(AbstractITBatchIndexTest.class);
+  public static final Logger LOG = new Logger(AbstractITBatchIndexTest.class);
 
-  @Inject
-  protected IntegrationTestingConfig config;
   @Inject
   protected SqlTestQueryHelper sqlQueryHelper;
 
   @Inject
   ClientInfoResourceTestClient clientInfoResourceTestClient;
 
+  @Inject
+  private MsqTestQueryHelper msqHelper;
+
+  @Inject
+  protected TestQueryHelper queryHelper;
+
+  @Inject
+  private DataLoaderHelper dataLoaderHelper;
+
+  @Rule
+  public TestWatcher watchman = new TestWatcher()
+  {
+    @Override
+    public void starting(Description d)
+    {
+      LOG.info("RUNNING %s", d.getDisplayName());
+    }
+
+    @Override
+    public void failed(Throwable e, Description d)
+    {
+      LOG.error("FAILED %s", d.getDisplayName());
+    }
+
+    @Override
+    public void finished(Description d)
+    {
+      LOG.info("FINISHED %s", d.getDisplayName());
+    }
+  };
+
+  /**
+   * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide 
datasource value.
+   */
+  protected String getStringFromFileAndReplaceDatasource(String filePath, 
String datasource)
+  {
+    String fileString;
+    try {
+      InputStream is = 
AbstractITBatchIndexTest.class.getResourceAsStream(filePath);
+      fileString = IOUtils.toString(is, StandardCharsets.UTF_8);
+    }
+    catch (IOException e) {
+      throw new ISE(e, "could not read query file: %s", filePath);
+    }
+
+    fileString = StringUtils.replace(
+        fileString,
+        "%%DATASOURCE%%",
+        datasource
+    );
+
+    return fileString;
+  }
+
+  /**
+   * Reads native queries from a file and runs against the provided datasource.
+   */
+  protected void doTestQuery(String dataSource, String queryFilePath)
+  {
+    try {
+      String query = getStringFromFileAndReplaceDatasource(queryFilePath, 
dataSource);
+      queryHelper.testQueriesFromString(query);
+    }
+    catch (Exception e) {
+      LOG.error(e, "Error while running test query at path " + queryFilePath);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Sumits a sqlTask, waits for task completion.
+   */
+  protected void submitMSQTask(String sqlTask, String datasource, Map<String, 
Object> msqContext) throws Exception
+  {
+    LOG.info("SqlTask - \n %s", sqlTask);
+
+    // Submit the tasks and wait for the datasource to get loaded
+    msqHelper.submitMsqTaskAndWaitForCompletion(
+        sqlTask,
+        msqContext
+    );
+
+    dataLoaderHelper.waitUntilDatasourceIsReady(datasource);
+  }
+
+  /**
+   * Sumits a sqlTask, waits for task completion.
+   */
+  protected void submitMSQTaskFromFile(String sqlFilePath, String datasource, 
Map<String, Object> msqContext) throws Exception
+  {
+    String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, 
datasource);
+    submitMSQTask(sqlTask, datasource, msqContext);
+  }
+
+  /**
+   * Runs a SQL ingest test.
+   *
+   * @param  sqlFilePath path of file containing the sql query.
+   * @param  queryFilePath path of file containing the native test queries to 
be run on the ingested datasource.
+   * @param  datasource name of the datasource. %%DATASOURCE%% in the sql and 
queries will be replaced with this value.
+   * @param  msqContext context parameters to be passed with MSQ API call.
+   */
+  protected void runMSQTaskandTestQueries(String sqlFilePath,
+                                          String queryFilePath,
+                                          String datasource,
+                                          Map<String, Object> msqContext) 
throws Exception
+  {
+    LOG.info("Starting MSQ test for [%s, %s]", sqlFilePath, queryFilePath);
+
+    submitMSQTaskFromFile(sqlFilePath, datasource, msqContext);
+    doTestQuery(datasource, queryFilePath);
+  }
+
+  /**
+   * Runs a reindex SQL ingest test.
+   * Same as runMSQTaskandTestQueries, but replaces both %%DATASOURCE%% and 
%%REINDEX_DATASOURCE%% in the SQL Task.
+   */
+  protected void runReindexMSQTaskandTestQueries(String sqlFilePath,
+                                                 String queryFilePath,
+                                                 String datasource,
+                                                 String reindexDatasource,
+                                                 Map<String, Object> 
msqContext) throws Exception
+  {
+    LOG.info("Starting Reindex MSQ test for [%s, %s]", sqlFilePath, 
queryFilePath);

Review Comment:
   See comments above for function signature, log line.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.utils.S3TestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static junit.framework.Assert.fail;
+
+
+public abstract class AbstractS3InputSourceParallelIndexTest extends 
AbstractITBatchIndexTest
+{
+  private static String indexDatasource;
+  private static S3TestUtil s3;
+  private static final String INDEX_TASK = 
"/indexer/wikipedia_cloud_index_task.json";
+  private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] resources()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static List<String> fileList()
+  {
+    return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3);
+  }
+
+  @BeforeClass
+  public static void uploadDataFilesToS3()
+  {
+    List<String> filesToUpload = new ArrayList<>();
+    String localPath = "resources/data/batch_index/json/";
+    for (String file : fileList()) {
+      filesToUpload.add(localPath + file);
+    }
+    try {
+      s3 = new S3TestUtil();
+      s3.uploadDataFilesToS3(filesToUpload);

Review Comment:
   Does this take a list of `File` objects as well as a list of strings? If 
only a list of strings, then, sigh, Amazon is non-standard and the existing 
code is adequate, even if non-standard.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.utils.S3TestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static junit.framework.Assert.fail;
+
+
+public abstract class AbstractS3InputSourceParallelIndexTest extends 
AbstractITBatchIndexTest
+{
+  private static String indexDatasource;
+  private static S3TestUtil s3;
+  private static final String INDEX_TASK = 
"/indexer/wikipedia_cloud_index_task.json";
+  private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] resources()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static List<String> fileList()
+  {
+    return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3);
+  }
+
+  @BeforeClass
+  public static void uploadDataFilesToS3()
+  {
+    List<String> filesToUpload = new ArrayList<>();
+    String localPath = "resources/data/batch_index/json/";
+    for (String file : fileList()) {
+      filesToUpload.add(localPath + file);
+    }
+    try {
+      s3 = new S3TestUtil();
+      s3.uploadDataFilesToS3(filesToUpload);
+    }
+    catch (Exception e) {
+      // Fail if exception
+      fail();
+    }
+  }
+
+  @After
+  public void deleteSegmentsFromS3()
+  {
+    // Deleting folder created for storing segments (by druid) after test is 
completed
+    s3.deleteFolderFromS3(indexDatasource);
+  }
+
+  @AfterClass
+  public static void deleteDataFilesFromS3()
+  {
+    // Deleting uploaded data files
+    s3.deleteFilesFromS3(fileList());
+  }
+
+  void doTest(
+      Pair<String, List> s3InputSource,
+      Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
+  ) throws Exception
+  {
+    indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
+    try (
+        final Closeable ignored1 = unloader(indexDatasource + 
config.getExtraDatasourceNameSuffix());
+    ) {
+      final Function<String, String> s3PropsTransform = spec -> {
+        try {
+          String inputSourceValue = 
jsonMapper.writeValueAsString(s3InputSource.rhs);
+          inputSourceValue = StringUtils.replace(
+              inputSourceValue,
+              "%%BUCKET%%",
+              config.getCloudBucket()
+          );
+          inputSourceValue = StringUtils.replace(
+              inputSourceValue,
+              "%%PATH%%",
+              config.getCloudPath()
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_FORMAT_TYPE%%",
+              InputFormatDetails.JSON.getInputFormatType()
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%PARTITIONS_SPEC%%",
+              jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, 
null))
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_SOURCE_TYPE%%",
+              "s3"
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_SOURCE_PROPERTY_KEY%%",
+              s3InputSource.lhs
+          );
+          return StringUtils.replace(
+              spec,
+              "%%INPUT_SOURCE_PROPERTY_VALUE%%",
+              inputSourceValue
+          );
+        }
+        catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      };
+
+      doIndexTest(
+          indexDatasource,
+          INDEX_TASK,
+          s3PropsTransform,
+          INDEX_QUERIES_RESOURCE,
+          false,
+          true,
+          true,
+          segmentAvailabilityConfirmationPair
+      );
+    }
+  }
+
+  public void doMSQTest(Pair<String, List> s3InputSource,
+                                         String IngestSQLFilePath,
+                                         String TestQueriesFilePath)
+  {
+    try {
+      indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
+      String sqlTask = 
getStringFromFileAndReplaceDatasource(IngestSQLFilePath, indexDatasource);
+      String inputSourceValue = 
jsonMapper.writeValueAsString(s3InputSource.rhs);
+      Map<String, Object> context = ImmutableMap.of("finalizeAggregations", 
false,
+                                                    "maxNumTasks", 5,
+                                                    
"groupByEnableMultiValueUnnesting", false);
+
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%INPUT_SOURCE_PROPERTY_KEY%%",
+          s3InputSource.lhs
+      );
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%INPUT_SOURCE_PROPERTY_VALUE%%",
+          inputSourceValue
+      );
+
+      // Setting the correct object path in the sqlTask.
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%BUCKET%%",
+          config.getCloudBucket() // Getting from DRUID_CLOUD_BUCKET env 
variable
+      );
+      sqlTask = StringUtils.replace(
+          sqlTask,
+          "%%PATH%%",
+          config.getCloudPath() // Getting from DRUID_CLOUD_PATH env variable
+      );
+
+      submitMSQTask(sqlTask, indexDatasource, context);
+
+      // Verifying ingested datasource
+      doTestQuery(indexDatasource, TestQueriesFilePath);
+
+    }
+    catch (Exception e) {
+      LOG.error(e, "Error while testing [%s] with s3 input source property key 
[%s]",
+                IngestSQLFilePath, s3InputSource.lhs);

Review Comment:
   Nit: in Java, names that start with a capital letter are classes. If this is 
a constant, then `INGEST_SQL_FILE_PATH`. If a variable, `ingestSqlFilePath`.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITS3ToS3ParallelIndexTest.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import junitparams.Parameters;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.List;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET -    s3 bucket name (value to be set in 
druid.storage.bucket)
+ * DRUID_CLOUD_PATH -      path inside the bucket where the test data files 
will be uploaded
+ *                         (this will also be used as druid.storage.baseKey 
for s3 deep storage setup)
+ * <p>
+ * The AWS key, secret and region should be set in AWS_ACCESS_KEY_ID, 
AWS_SECRET_ACCESS_KEY and AWS_REGION respectively.
+ * <p>
+ * <a 
href="https://druid.apache.org/docs/latest/development/extensions-core/s3.html";>S3
 Deep Storage setup in druid</a>

Review Comment:
   Great info, thanks!



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.utils.S3TestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static junit.framework.Assert.fail;
+
+
+public abstract class AbstractS3InputSourceParallelIndexTest extends 
AbstractITBatchIndexTest
+{
+  private static String indexDatasource;
+  private static S3TestUtil s3;
+  private static final String INDEX_TASK = 
"/indexer/wikipedia_cloud_index_task.json";
+  private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] resources()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static List<String> fileList()
+  {
+    return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3);
+  }
+
+  @BeforeClass
+  public static void uploadDataFilesToS3()
+  {
+    List<String> filesToUpload = new ArrayList<>();
+    String localPath = "resources/data/batch_index/json/";
+    for (String file : fileList()) {
+      filesToUpload.add(localPath + file);

Review Comment:
   This is rather non-standard Java. For local files:
   
   ```java
       List<File> filesToUpload = new ArrayList<>();
       File localPath = new File("resources/data/batch_index/json/");
       for (String file : fileList()) {
         filesToUpload.add(new File(localPath, file));
        }
   ```
   
   If you want to be fancy:
   
   ```java
       fileList().stream().map(new File(localPath, 
file))).collect(Collectors.toList());
   ```
   
   I myself find the loop is easier to understand, but others on the team 
prefer the fancy approach.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.utils.S3TestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static junit.framework.Assert.fail;
+
+
+public abstract class AbstractS3InputSourceParallelIndexTest extends 
AbstractITBatchIndexTest
+{
+  private static String indexDatasource;
+  private static S3TestUtil s3;
+  private static final String INDEX_TASK = 
"/indexer/wikipedia_cloud_index_task.json";
+  private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] resources()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static List<String> fileList()
+  {
+    return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3);
+  }
+
+  @BeforeClass
+  public static void uploadDataFilesToS3()
+  {
+    List<String> filesToUpload = new ArrayList<>();
+    String localPath = "resources/data/batch_index/json/";
+    for (String file : fileList()) {
+      filesToUpload.add(localPath + file);
+    }
+    try {
+      s3 = new S3TestUtil();
+      s3.uploadDataFilesToS3(filesToUpload);
+    }
+    catch (Exception e) {
+      // Fail if exception
+      fail();
+    }
+  }
+
+  @After
+  public void deleteSegmentsFromS3()
+  {
+    // Deleting folder created for storing segments (by druid) after test is 
completed
+    s3.deleteFolderFromS3(indexDatasource);
+  }
+
+  @AfterClass
+  public static void deleteDataFilesFromS3()
+  {
+    // Deleting uploaded data files
+    s3.deleteFilesFromS3(fileList());
+  }
+
+  void doTest(
+      Pair<String, List> s3InputSource,
+      Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
+  ) throws Exception
+  {
+    indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
+    try (
+        final Closeable ignored1 = unloader(indexDatasource + 
config.getExtraDatasourceNameSuffix());
+    ) {
+      final Function<String, String> s3PropsTransform = spec -> {

Review Comment:
   Suggestion: pull this out as a real function so it can be more easily 
tested. It is awkward to do the rewrites only as part of job submission.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.utils.S3TestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static junit.framework.Assert.fail;
+
+
+public abstract class AbstractS3InputSourceParallelIndexTest extends 
AbstractITBatchIndexTest
+{
+  private static String indexDatasource;
+  private static S3TestUtil s3;
+  private static final String INDEX_TASK = 
"/indexer/wikipedia_cloud_index_task.json";
+  private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] resources()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static List<String> fileList()
+  {
+    return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3);
+  }
+
+  @BeforeClass
+  public static void uploadDataFilesToS3()
+  {
+    List<String> filesToUpload = new ArrayList<>();
+    String localPath = "resources/data/batch_index/json/";
+    for (String file : fileList()) {
+      filesToUpload.add(localPath + file);
+    }
+    try {
+      s3 = new S3TestUtil();
+      s3.uploadDataFilesToS3(filesToUpload);
+    }
+    catch (Exception e) {
+      // Fail if exception
+      fail();
+    }
+  }
+
+  @After
+  public void deleteSegmentsFromS3()
+  {
+    // Deleting folder created for storing segments (by druid) after test is 
completed
+    s3.deleteFolderFromS3(indexDatasource);
+  }
+
+  @AfterClass
+  public static void deleteDataFilesFromS3()
+  {
+    // Deleting uploaded data files
+    s3.deleteFilesFromS3(fileList());
+  }
+
+  void doTest(
+      Pair<String, List> s3InputSource,
+      Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
+  ) throws Exception
+  {
+    indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
+    try (
+        final Closeable ignored1 = unloader(indexDatasource + 
config.getExtraDatasourceNameSuffix());
+    ) {
+      final Function<String, String> s3PropsTransform = spec -> {
+        try {
+          String inputSourceValue = 
jsonMapper.writeValueAsString(s3InputSource.rhs);
+          inputSourceValue = StringUtils.replace(
+              inputSourceValue,
+              "%%BUCKET%%",
+              config.getCloudBucket()
+          );
+          inputSourceValue = StringUtils.replace(
+              inputSourceValue,
+              "%%PATH%%",
+              config.getCloudPath()
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_FORMAT_TYPE%%",
+              InputFormatDetails.JSON.getInputFormatType()
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%PARTITIONS_SPEC%%",
+              jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, 
null))
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_SOURCE_TYPE%%",
+              "s3"
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_SOURCE_PROPERTY_KEY%%",
+              s3InputSource.lhs
+          );
+          return StringUtils.replace(
+              spec,
+              "%%INPUT_SOURCE_PROPERTY_VALUE%%",
+              inputSourceValue
+          );
+        }
+        catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      };
+
+      doIndexTest(
+          indexDatasource,
+          INDEX_TASK,
+          s3PropsTransform,
+          INDEX_QUERIES_RESOURCE,
+          false,
+          true,
+          true,
+          segmentAvailabilityConfirmationPair
+      );
+    }
+  }
+
+  public void doMSQTest(Pair<String, List> s3InputSource,

Review Comment:
   Here and elsewhere. Would be great to include a Javadoc comment explaining 
what this does. Yes, we can figure it out. But, since you wrote it, you could 
save us time by explaining what your intent is for this function. That is, does 
it just run? Does it run and check the results? Where are the input files? What 
are the parameters we should put into those files? Where do the values come 
from? Some of this can be at the to of the test. If it is shared across a suite 
of tests, please put it in the base class.
   
   Explaining here is best. There should also be some basic info in the doc 
files, which can point here for details.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.utils.S3TestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static junit.framework.Assert.fail;
+
+
+public abstract class AbstractS3InputSourceParallelIndexTest extends 
AbstractITBatchIndexTest
+{
+  private static String indexDatasource;
+  private static S3TestUtil s3;
+  private static final String INDEX_TASK = 
"/indexer/wikipedia_cloud_index_task.json";
+  private static final String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_index_queries.json";
+  private static final String INPUT_SOURCE_URIS_KEY = "uris";
+  private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes";
+  private static final String INPUT_SOURCE_OBJECTS_KEY = "objects";
+  private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json";
+  private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json";
+  private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json";
+
+  public static Object[][] resources()
+  {
+    return new Object[][]{
+        {new Pair<>(INPUT_SOURCE_URIS_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2,
+                        "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_PREFIXES_KEY,
+                    ImmutableList.of(
+                        "s3://%%BUCKET%%/%%PATH%%/"
+                    )
+        )},
+        {new Pair<>(INPUT_SOURCE_OBJECTS_KEY,
+                    ImmutableList.of(
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_1),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_2),
+                        ImmutableMap.of("bucket", "%%BUCKET%%", "path", 
"%%PATH%%/" + WIKIPEDIA_DATA_3)
+                    )
+        )}
+    };
+  }
+
+  public static List<String> fileList()
+  {
+    return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3);
+  }
+
+  @BeforeClass
+  public static void uploadDataFilesToS3()
+  {
+    List<String> filesToUpload = new ArrayList<>();
+    String localPath = "resources/data/batch_index/json/";
+    for (String file : fileList()) {
+      filesToUpload.add(localPath + file);
+    }
+    try {
+      s3 = new S3TestUtil();
+      s3.uploadDataFilesToS3(filesToUpload);
+    }
+    catch (Exception e) {
+      // Fail if exception
+      fail();
+    }
+  }
+
+  @After
+  public void deleteSegmentsFromS3()
+  {
+    // Deleting folder created for storing segments (by druid) after test is 
completed
+    s3.deleteFolderFromS3(indexDatasource);
+  }
+
+  @AfterClass
+  public static void deleteDataFilesFromS3()
+  {
+    // Deleting uploaded data files
+    s3.deleteFilesFromS3(fileList());
+  }
+
+  void doTest(
+      Pair<String, List> s3InputSource,
+      Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair
+  ) throws Exception
+  {
+    indexDatasource = "wikipedia_index_test_" + UUID.randomUUID();
+    try (
+        final Closeable ignored1 = unloader(indexDatasource + 
config.getExtraDatasourceNameSuffix());
+    ) {
+      final Function<String, String> s3PropsTransform = spec -> {
+        try {
+          String inputSourceValue = 
jsonMapper.writeValueAsString(s3InputSource.rhs);
+          inputSourceValue = StringUtils.replace(
+              inputSourceValue,
+              "%%BUCKET%%",
+              config.getCloudBucket()
+          );
+          inputSourceValue = StringUtils.replace(
+              inputSourceValue,
+              "%%PATH%%",
+              config.getCloudPath()
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_FORMAT_TYPE%%",
+              InputFormatDetails.JSON.getInputFormatType()
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%PARTITIONS_SPEC%%",
+              jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, 
null))
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_SOURCE_TYPE%%",
+              "s3"
+          );
+          spec = StringUtils.replace(
+              spec,
+              "%%INPUT_SOURCE_PROPERTY_KEY%%",
+              s3InputSource.lhs
+          );
+          return StringUtils.replace(
+              spec,
+              "%%INPUT_SOURCE_PROPERTY_VALUE%%",
+              inputSourceValue
+          );
+        }
+        catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      };
+
+      doIndexTest(
+          indexDatasource,
+          INDEX_TASK,
+          s3PropsTransform,
+          INDEX_QUERIES_RESOURCE,
+          false,
+          true,
+          true,
+          segmentAvailabilityConfirmationPair
+      );
+    }
+  }
+
+  public void doMSQTest(Pair<String, List> s3InputSource,
+                                         String IngestSQLFilePath,
+                                         String TestQueriesFilePath)

Review Comment:
   Nit: method signature format.



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java:
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.indexer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.testing.utils.S3TestUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static junit.framework.Assert.fail;
+
+
+public abstract class AbstractS3InputSourceParallelIndexTest extends 
AbstractITBatchIndexTest

Review Comment:
   A Javadoc comment would be helpful to explain what this does. The original 
didn't have one, but now that you've figured it out, would be good to explain 
the purpose to the rest of us.



##########
integration-tests/src/main/java/org/apache/druid/testing/utils/S3TestUtil.java:
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class S3TestUtil

Review Comment:
   I agree. @cryptoe, you created such an API for Druid's internal use. Can we 
reuse that here? Create a test-specific wrapper? The idea state would be that 
there is an `ObjectStoreClient` interface which provides the upload, delete, 
etc. methods. @cryptoe, can you help @abhagraw define such a test client? Doing 
so goes a bit beyond the "convert the old IT" task and would benefit from some 
dev help. 



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.apache.druid.testsEx.indexer.AbstractS3InputSourceParallelIndexTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.List;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET -    s3 Bucket to store in (value to be set in 
druid.storage.bucket)
+ * DRUID_CLOUD_PATH -      path inside the bucket where the test data files 
will be uploaded
+ *                         (this will also be used as druid.storage.baseKey 
for s3 deep storage setup)
+ * <p>
+ * The AWS key, secret and region should be set in
+ * AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION respectively.
+ * <p>
+ * <a 
href="https://druid.apache.org/docs/latest/development/extensions-core/s3.html";>S3
 Deep Storage setup in druid</a>

Review Comment:
   This comment is duplicated, which is perhaps OK. The commonality is the 
`S3DeepStorage` test category. Perhaps it makes sense to move the documentation 
there for all tests that run with that configuration?



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.apache.druid.testsEx.indexer.AbstractS3InputSourceParallelIndexTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.List;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET -    s3 Bucket to store in (value to be set in 
druid.storage.bucket)
+ * DRUID_CLOUD_PATH -      path inside the bucket where the test data files 
will be uploaded
+ *                         (this will also be used as druid.storage.baseKey 
for s3 deep storage setup)
+ * <p>
+ * The AWS key, secret and region should be set in
+ * AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION respectively.
+ * <p>
+ * <a 
href="https://druid.apache.org/docs/latest/development/extensions-core/s3.html";>S3
 Deep Storage setup in druid</a>
+ */
+
+@RunWith(DruidTestRunner.class)
+@Category(S3DeepStorage.class)
+public class ITS3SQLBasedIngestionTest extends 
AbstractS3InputSourceParallelIndexTest

Review Comment:
   As noted earlier, a brief comment would be helpful to us reviewers. I gather 
this is the MSQ ingestion test, but using S3 as deep storage? Also as the 
source of the files?



##########
integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testsEx.msq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Inject;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.testsEx.categories.S3DeepStorage;
+import org.apache.druid.testsEx.config.DruidTestRunner;
+import org.apache.druid.testsEx.indexer.AbstractS3InputSourceParallelIndexTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.List;
+
+/**
+ * IMPORTANT:
+ * To run this test, you must set the following env variables in the build 
environment
+ * DRUID_CLOUD_BUCKET -    s3 Bucket to store in (value to be set in 
druid.storage.bucket)
+ * DRUID_CLOUD_PATH -      path inside the bucket where the test data files 
will be uploaded
+ *                         (this will also be used as druid.storage.baseKey 
for s3 deep storage setup)
+ * <p>
+ * The AWS key, secret and region should be set in
+ * AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION respectively.
+ * <p>
+ * <a 
href="https://druid.apache.org/docs/latest/development/extensions-core/s3.html";>S3
 Deep Storage setup in druid</a>
+ */
+
+@RunWith(DruidTestRunner.class)
+@Category(S3DeepStorage.class)
+public class ITS3SQLBasedIngestionTest extends 
AbstractS3InputSourceParallelIndexTest
+{
+  @Inject
+  @Json
+  protected ObjectMapper jsonMapper;
+  private static final String CLOUD_INGEST_SQL = 
"/multi-stage-query/wikipedia_cloud_index_msq.sql";
+  private static final String INDEX_QUERIES_FILE = 
"/multi-stage-query/wikipedia_index_queries.json";

Review Comment:
   The typical Java standard is to put constants at the top of the file, 
followed by variables. So, move `jsonMapper` after these two constants, with a 
blank line between them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to