paul-rogers commented on code in PR #13535: URL: https://github.com/apache/druid/pull/13535#discussion_r1052462276
########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java: ########## @@ -91,16 +97,149 @@ public String getFolderSuffix() } } - private static final Logger LOG = new Logger(AbstractITBatchIndexTest.class); + public static final Logger LOG = new Logger(AbstractITBatchIndexTest.class); - @Inject - protected IntegrationTestingConfig config; @Inject protected SqlTestQueryHelper sqlQueryHelper; @Inject ClientInfoResourceTestClient clientInfoResourceTestClient; + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + protected TestQueryHelper queryHelper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + @Rule + public TestWatcher watchman = new TestWatcher() + { + @Override + public void starting(Description d) + { + LOG.info("RUNNING %s", d.getDisplayName()); + } + + @Override + public void failed(Throwable e, Description d) + { + LOG.error("FAILED %s", d.getDisplayName()); + } + + @Override + public void finished(Description d) + { + LOG.info("FINISHED %s", d.getDisplayName()); + } + }; + + /** + * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide datasource value. + */ + protected String getStringFromFileAndReplaceDatasource(String filePath, String datasource) + { + String fileString; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(filePath); + fileString = IOUtils.toString(is, StandardCharsets.UTF_8); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", filePath); + } + + fileString = StringUtils.replace( + fileString, + "%%DATASOURCE%%", + datasource + ); + + return fileString; + } + + /** + * Reads native queries from a file and runs against the provided datasource. + */ + protected void doTestQuery(String dataSource, String queryFilePath) + { + try { + String query = getStringFromFileAndReplaceDatasource(queryFilePath, dataSource); + queryHelper.testQueriesFromString(query); + } + catch (Exception e) { + LOG.error(e, "Error while running test query at path " + queryFilePath); + throw new RuntimeException(e); + } + } + + /** + * Sumits a sqlTask, waits for task completion. + */ + protected void submitMSQTask(String sqlTask, String datasource, Map<String, Object> msqContext) throws Exception + { + LOG.info("SqlTask - \n %s", sqlTask); + + // Submit the tasks and wait for the datasource to get loaded + msqHelper.submitMsqTaskAndWaitForCompletion( + sqlTask, + msqContext + ); + + dataLoaderHelper.waitUntilDatasourceIsReady(datasource); + } + + /** + * Sumits a sqlTask, waits for task completion. + */ + protected void submitMSQTaskFromFile(String sqlFilePath, String datasource, Map<String, Object> msqContext) throws Exception + { + String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, datasource); + submitMSQTask(sqlTask, datasource, msqContext); + } + + /** + * Runs a SQL ingest test. + * + * @param sqlFilePath path of file containing the sql query. + * @param queryFilePath path of file containing the native test queries to be run on the ingested datasource. + * @param datasource name of the datasource. %%DATASOURCE%% in the sql and queries will be replaced with this value. + * @param msqContext context parameters to be passed with MSQ API call. + */ + protected void runMSQTaskandTestQueries(String sqlFilePath, + String queryFilePath, + String datasource, + Map<String, Object> msqContext) throws Exception Review Comment: Nit: I think Druid prefers the following formatting: ```java protected void runMSQTaskandTestQueries( String sqlFilePath, String queryFilePath, String datasource, Map<String, Object> msqContext ) throws Exception ``` At least, I always get comments if I forget to format things the above way. The commenter notes "we haven't figured out how to get Checkstyle to enforce this style, but it what we want." ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java: ########## @@ -91,16 +97,149 @@ public String getFolderSuffix() } } - private static final Logger LOG = new Logger(AbstractITBatchIndexTest.class); + public static final Logger LOG = new Logger(AbstractITBatchIndexTest.class); - @Inject - protected IntegrationTestingConfig config; @Inject protected SqlTestQueryHelper sqlQueryHelper; @Inject ClientInfoResourceTestClient clientInfoResourceTestClient; + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + protected TestQueryHelper queryHelper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + @Rule + public TestWatcher watchman = new TestWatcher() + { + @Override + public void starting(Description d) + { + LOG.info("RUNNING %s", d.getDisplayName()); + } + + @Override + public void failed(Throwable e, Description d) + { + LOG.error("FAILED %s", d.getDisplayName()); + } + + @Override + public void finished(Description d) + { + LOG.info("FINISHED %s", d.getDisplayName()); + } + }; + + /** + * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide datasource value. + */ + protected String getStringFromFileAndReplaceDatasource(String filePath, String datasource) + { + String fileString; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(filePath); + fileString = IOUtils.toString(is, StandardCharsets.UTF_8); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", filePath); + } + + fileString = StringUtils.replace( + fileString, + "%%DATASOURCE%%", + datasource + ); + + return fileString; + } + + /** + * Reads native queries from a file and runs against the provided datasource. + */ + protected void doTestQuery(String dataSource, String queryFilePath) + { + try { + String query = getStringFromFileAndReplaceDatasource(queryFilePath, dataSource); + queryHelper.testQueriesFromString(query); + } + catch (Exception e) { + LOG.error(e, "Error while running test query at path " + queryFilePath); + throw new RuntimeException(e); + } + } + + /** + * Sumits a sqlTask, waits for task completion. + */ + protected void submitMSQTask(String sqlTask, String datasource, Map<String, Object> msqContext) throws Exception + { + LOG.info("SqlTask - \n %s", sqlTask); + + // Submit the tasks and wait for the datasource to get loaded + msqHelper.submitMsqTaskAndWaitForCompletion( + sqlTask, + msqContext + ); + + dataLoaderHelper.waitUntilDatasourceIsReady(datasource); + } + + /** + * Sumits a sqlTask, waits for task completion. Review Comment: Sumits -> Submits ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java: ########## @@ -91,16 +97,149 @@ public String getFolderSuffix() } } - private static final Logger LOG = new Logger(AbstractITBatchIndexTest.class); + public static final Logger LOG = new Logger(AbstractITBatchIndexTest.class); - @Inject - protected IntegrationTestingConfig config; @Inject protected SqlTestQueryHelper sqlQueryHelper; @Inject ClientInfoResourceTestClient clientInfoResourceTestClient; + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + protected TestQueryHelper queryHelper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + @Rule + public TestWatcher watchman = new TestWatcher() + { + @Override + public void starting(Description d) + { + LOG.info("RUNNING %s", d.getDisplayName()); + } + + @Override + public void failed(Throwable e, Description d) + { + LOG.error("FAILED %s", d.getDisplayName()); + } + + @Override + public void finished(Description d) + { + LOG.info("FINISHED %s", d.getDisplayName()); + } + }; + + /** + * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide datasource value. + */ + protected String getStringFromFileAndReplaceDatasource(String filePath, String datasource) + { + String fileString; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(filePath); + fileString = IOUtils.toString(is, StandardCharsets.UTF_8); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", filePath); + } + + fileString = StringUtils.replace( + fileString, + "%%DATASOURCE%%", + datasource + ); + + return fileString; + } + + /** + * Reads native queries from a file and runs against the provided datasource. + */ + protected void doTestQuery(String dataSource, String queryFilePath) + { + try { + String query = getStringFromFileAndReplaceDatasource(queryFilePath, dataSource); + queryHelper.testQueriesFromString(query); + } + catch (Exception e) { + LOG.error(e, "Error while running test query at path " + queryFilePath); + throw new RuntimeException(e); + } + } + + /** + * Sumits a sqlTask, waits for task completion. + */ + protected void submitMSQTask(String sqlTask, String datasource, Map<String, Object> msqContext) throws Exception + { + LOG.info("SqlTask - \n %s", sqlTask); + + // Submit the tasks and wait for the datasource to get loaded + msqHelper.submitMsqTaskAndWaitForCompletion( + sqlTask, + msqContext + ); + + dataLoaderHelper.waitUntilDatasourceIsReady(datasource); + } + + /** + * Sumits a sqlTask, waits for task completion. + */ + protected void submitMSQTaskFromFile(String sqlFilePath, String datasource, Map<String, Object> msqContext) throws Exception + { + String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, datasource); + submitMSQTask(sqlTask, datasource, msqContext); + } + + /** + * Runs a SQL ingest test. + * + * @param sqlFilePath path of file containing the sql query. + * @param queryFilePath path of file containing the native test queries to be run on the ingested datasource. + * @param datasource name of the datasource. %%DATASOURCE%% in the sql and queries will be replaced with this value. + * @param msqContext context parameters to be passed with MSQ API call. + */ + protected void runMSQTaskandTestQueries(String sqlFilePath, + String queryFilePath, + String datasource, + Map<String, Object> msqContext) throws Exception + { + LOG.info("Starting MSQ test for [%s, %s]", sqlFilePath, queryFilePath); Review Comment: Nit: the `[ ]` thing we use was probably originally for cases where the value is unformatted. Here, it would be cleaner to say: ```java LOG.info("Starting MSQ test for sql path: %s, query path: %s", sqlFilePath, queryFilePath); ``` ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractITBatchIndexTest.java: ########## @@ -91,16 +97,149 @@ public String getFolderSuffix() } } - private static final Logger LOG = new Logger(AbstractITBatchIndexTest.class); + public static final Logger LOG = new Logger(AbstractITBatchIndexTest.class); - @Inject - protected IntegrationTestingConfig config; @Inject protected SqlTestQueryHelper sqlQueryHelper; @Inject ClientInfoResourceTestClient clientInfoResourceTestClient; + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + protected TestQueryHelper queryHelper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + @Rule + public TestWatcher watchman = new TestWatcher() + { + @Override + public void starting(Description d) + { + LOG.info("RUNNING %s", d.getDisplayName()); + } + + @Override + public void failed(Throwable e, Description d) + { + LOG.error("FAILED %s", d.getDisplayName()); + } + + @Override + public void finished(Description d) + { + LOG.info("FINISHED %s", d.getDisplayName()); + } + }; + + /** + * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide datasource value. + */ + protected String getStringFromFileAndReplaceDatasource(String filePath, String datasource) + { + String fileString; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(filePath); + fileString = IOUtils.toString(is, StandardCharsets.UTF_8); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", filePath); + } + + fileString = StringUtils.replace( + fileString, + "%%DATASOURCE%%", + datasource + ); + + return fileString; + } + + /** + * Reads native queries from a file and runs against the provided datasource. + */ + protected void doTestQuery(String dataSource, String queryFilePath) + { + try { + String query = getStringFromFileAndReplaceDatasource(queryFilePath, dataSource); + queryHelper.testQueriesFromString(query); + } + catch (Exception e) { + LOG.error(e, "Error while running test query at path " + queryFilePath); + throw new RuntimeException(e); + } + } + + /** + * Sumits a sqlTask, waits for task completion. + */ + protected void submitMSQTask(String sqlTask, String datasource, Map<String, Object> msqContext) throws Exception + { + LOG.info("SqlTask - \n %s", sqlTask); + + // Submit the tasks and wait for the datasource to get loaded + msqHelper.submitMsqTaskAndWaitForCompletion( + sqlTask, + msqContext + ); + + dataLoaderHelper.waitUntilDatasourceIsReady(datasource); + } + + /** + * Sumits a sqlTask, waits for task completion. + */ + protected void submitMSQTaskFromFile(String sqlFilePath, String datasource, Map<String, Object> msqContext) throws Exception + { + String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, datasource); + submitMSQTask(sqlTask, datasource, msqContext); + } + + /** + * Runs a SQL ingest test. + * + * @param sqlFilePath path of file containing the sql query. + * @param queryFilePath path of file containing the native test queries to be run on the ingested datasource. + * @param datasource name of the datasource. %%DATASOURCE%% in the sql and queries will be replaced with this value. + * @param msqContext context parameters to be passed with MSQ API call. + */ + protected void runMSQTaskandTestQueries(String sqlFilePath, + String queryFilePath, + String datasource, + Map<String, Object> msqContext) throws Exception + { + LOG.info("Starting MSQ test for [%s, %s]", sqlFilePath, queryFilePath); + + submitMSQTaskFromFile(sqlFilePath, datasource, msqContext); + doTestQuery(datasource, queryFilePath); + } + + /** + * Runs a reindex SQL ingest test. + * Same as runMSQTaskandTestQueries, but replaces both %%DATASOURCE%% and %%REINDEX_DATASOURCE%% in the SQL Task. + */ + protected void runReindexMSQTaskandTestQueries(String sqlFilePath, + String queryFilePath, + String datasource, + String reindexDatasource, + Map<String, Object> msqContext) throws Exception + { + LOG.info("Starting Reindex MSQ test for [%s, %s]", sqlFilePath, queryFilePath); Review Comment: See comments above for function signature, log line. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.utils.S3TestUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +import static junit.framework.Assert.fail; + + +public abstract class AbstractS3InputSourceParallelIndexTest extends AbstractITBatchIndexTest +{ + private static String indexDatasource; + private static S3TestUtil s3; + private static final String INDEX_TASK = "/indexer/wikipedia_cloud_index_task.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] resources() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static List<String> fileList() + { + return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3); + } + + @BeforeClass + public static void uploadDataFilesToS3() + { + List<String> filesToUpload = new ArrayList<>(); + String localPath = "resources/data/batch_index/json/"; + for (String file : fileList()) { + filesToUpload.add(localPath + file); + } + try { + s3 = new S3TestUtil(); + s3.uploadDataFilesToS3(filesToUpload); Review Comment: Does this take a list of `File` objects as well as a list of strings? If only a list of strings, then, sigh, Amazon is non-standard and the existing code is adequate, even if non-standard. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.utils.S3TestUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +import static junit.framework.Assert.fail; + + +public abstract class AbstractS3InputSourceParallelIndexTest extends AbstractITBatchIndexTest +{ + private static String indexDatasource; + private static S3TestUtil s3; + private static final String INDEX_TASK = "/indexer/wikipedia_cloud_index_task.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] resources() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static List<String> fileList() + { + return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3); + } + + @BeforeClass + public static void uploadDataFilesToS3() + { + List<String> filesToUpload = new ArrayList<>(); + String localPath = "resources/data/batch_index/json/"; + for (String file : fileList()) { + filesToUpload.add(localPath + file); + } + try { + s3 = new S3TestUtil(); + s3.uploadDataFilesToS3(filesToUpload); + } + catch (Exception e) { + // Fail if exception + fail(); + } + } + + @After + public void deleteSegmentsFromS3() + { + // Deleting folder created for storing segments (by druid) after test is completed + s3.deleteFolderFromS3(indexDatasource); + } + + @AfterClass + public static void deleteDataFilesFromS3() + { + // Deleting uploaded data files + s3.deleteFilesFromS3(fileList()); + } + + void doTest( + Pair<String, List> s3InputSource, + Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair + ) throws Exception + { + indexDatasource = "wikipedia_index_test_" + UUID.randomUUID(); + try ( + final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix()); + ) { + final Function<String, String> s3PropsTransform = spec -> { + try { + String inputSourceValue = jsonMapper.writeValueAsString(s3InputSource.rhs); + inputSourceValue = StringUtils.replace( + inputSourceValue, + "%%BUCKET%%", + config.getCloudBucket() + ); + inputSourceValue = StringUtils.replace( + inputSourceValue, + "%%PATH%%", + config.getCloudPath() + ); + spec = StringUtils.replace( + spec, + "%%INPUT_FORMAT_TYPE%%", + InputFormatDetails.JSON.getInputFormatType() + ); + spec = StringUtils.replace( + spec, + "%%PARTITIONS_SPEC%%", + jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, null)) + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_TYPE%%", + "s3" + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_PROPERTY_KEY%%", + s3InputSource.lhs + ); + return StringUtils.replace( + spec, + "%%INPUT_SOURCE_PROPERTY_VALUE%%", + inputSourceValue + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + + doIndexTest( + indexDatasource, + INDEX_TASK, + s3PropsTransform, + INDEX_QUERIES_RESOURCE, + false, + true, + true, + segmentAvailabilityConfirmationPair + ); + } + } + + public void doMSQTest(Pair<String, List> s3InputSource, + String IngestSQLFilePath, + String TestQueriesFilePath) + { + try { + indexDatasource = "wikipedia_index_test_" + UUID.randomUUID(); + String sqlTask = getStringFromFileAndReplaceDatasource(IngestSQLFilePath, indexDatasource); + String inputSourceValue = jsonMapper.writeValueAsString(s3InputSource.rhs); + Map<String, Object> context = ImmutableMap.of("finalizeAggregations", false, + "maxNumTasks", 5, + "groupByEnableMultiValueUnnesting", false); + + sqlTask = StringUtils.replace( + sqlTask, + "%%INPUT_SOURCE_PROPERTY_KEY%%", + s3InputSource.lhs + ); + sqlTask = StringUtils.replace( + sqlTask, + "%%INPUT_SOURCE_PROPERTY_VALUE%%", + inputSourceValue + ); + + // Setting the correct object path in the sqlTask. + sqlTask = StringUtils.replace( + sqlTask, + "%%BUCKET%%", + config.getCloudBucket() // Getting from DRUID_CLOUD_BUCKET env variable + ); + sqlTask = StringUtils.replace( + sqlTask, + "%%PATH%%", + config.getCloudPath() // Getting from DRUID_CLOUD_PATH env variable + ); + + submitMSQTask(sqlTask, indexDatasource, context); + + // Verifying ingested datasource + doTestQuery(indexDatasource, TestQueriesFilePath); + + } + catch (Exception e) { + LOG.error(e, "Error while testing [%s] with s3 input source property key [%s]", + IngestSQLFilePath, s3InputSource.lhs); Review Comment: Nit: in Java, names that start with a capital letter are classes. If this is a constant, then `INGEST_SQL_FILE_PATH`. If a variable, `ingestSqlFilePath`. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITS3ToS3ParallelIndexTest.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import junitparams.Parameters; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.List; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET - s3 bucket name (value to be set in druid.storage.bucket) + * DRUID_CLOUD_PATH - path inside the bucket where the test data files will be uploaded + * (this will also be used as druid.storage.baseKey for s3 deep storage setup) + * <p> + * The AWS key, secret and region should be set in AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION respectively. + * <p> + * <a href="https://druid.apache.org/docs/latest/development/extensions-core/s3.html">S3 Deep Storage setup in druid</a> Review Comment: Great info, thanks! ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.utils.S3TestUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +import static junit.framework.Assert.fail; + + +public abstract class AbstractS3InputSourceParallelIndexTest extends AbstractITBatchIndexTest +{ + private static String indexDatasource; + private static S3TestUtil s3; + private static final String INDEX_TASK = "/indexer/wikipedia_cloud_index_task.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] resources() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static List<String> fileList() + { + return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3); + } + + @BeforeClass + public static void uploadDataFilesToS3() + { + List<String> filesToUpload = new ArrayList<>(); + String localPath = "resources/data/batch_index/json/"; + for (String file : fileList()) { + filesToUpload.add(localPath + file); Review Comment: This is rather non-standard Java. For local files: ```java List<File> filesToUpload = new ArrayList<>(); File localPath = new File("resources/data/batch_index/json/"); for (String file : fileList()) { filesToUpload.add(new File(localPath, file)); } ``` If you want to be fancy: ```java fileList().stream().map(new File(localPath, file))).collect(Collectors.toList()); ``` I myself find the loop is easier to understand, but others on the team prefer the fancy approach. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.utils.S3TestUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +import static junit.framework.Assert.fail; + + +public abstract class AbstractS3InputSourceParallelIndexTest extends AbstractITBatchIndexTest +{ + private static String indexDatasource; + private static S3TestUtil s3; + private static final String INDEX_TASK = "/indexer/wikipedia_cloud_index_task.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] resources() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static List<String> fileList() + { + return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3); + } + + @BeforeClass + public static void uploadDataFilesToS3() + { + List<String> filesToUpload = new ArrayList<>(); + String localPath = "resources/data/batch_index/json/"; + for (String file : fileList()) { + filesToUpload.add(localPath + file); + } + try { + s3 = new S3TestUtil(); + s3.uploadDataFilesToS3(filesToUpload); + } + catch (Exception e) { + // Fail if exception + fail(); + } + } + + @After + public void deleteSegmentsFromS3() + { + // Deleting folder created for storing segments (by druid) after test is completed + s3.deleteFolderFromS3(indexDatasource); + } + + @AfterClass + public static void deleteDataFilesFromS3() + { + // Deleting uploaded data files + s3.deleteFilesFromS3(fileList()); + } + + void doTest( + Pair<String, List> s3InputSource, + Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair + ) throws Exception + { + indexDatasource = "wikipedia_index_test_" + UUID.randomUUID(); + try ( + final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix()); + ) { + final Function<String, String> s3PropsTransform = spec -> { Review Comment: Suggestion: pull this out as a real function so it can be more easily tested. It is awkward to do the rewrites only as part of job submission. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.utils.S3TestUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +import static junit.framework.Assert.fail; + + +public abstract class AbstractS3InputSourceParallelIndexTest extends AbstractITBatchIndexTest +{ + private static String indexDatasource; + private static S3TestUtil s3; + private static final String INDEX_TASK = "/indexer/wikipedia_cloud_index_task.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] resources() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static List<String> fileList() + { + return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3); + } + + @BeforeClass + public static void uploadDataFilesToS3() + { + List<String> filesToUpload = new ArrayList<>(); + String localPath = "resources/data/batch_index/json/"; + for (String file : fileList()) { + filesToUpload.add(localPath + file); + } + try { + s3 = new S3TestUtil(); + s3.uploadDataFilesToS3(filesToUpload); + } + catch (Exception e) { + // Fail if exception + fail(); + } + } + + @After + public void deleteSegmentsFromS3() + { + // Deleting folder created for storing segments (by druid) after test is completed + s3.deleteFolderFromS3(indexDatasource); + } + + @AfterClass + public static void deleteDataFilesFromS3() + { + // Deleting uploaded data files + s3.deleteFilesFromS3(fileList()); + } + + void doTest( + Pair<String, List> s3InputSource, + Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair + ) throws Exception + { + indexDatasource = "wikipedia_index_test_" + UUID.randomUUID(); + try ( + final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix()); + ) { + final Function<String, String> s3PropsTransform = spec -> { + try { + String inputSourceValue = jsonMapper.writeValueAsString(s3InputSource.rhs); + inputSourceValue = StringUtils.replace( + inputSourceValue, + "%%BUCKET%%", + config.getCloudBucket() + ); + inputSourceValue = StringUtils.replace( + inputSourceValue, + "%%PATH%%", + config.getCloudPath() + ); + spec = StringUtils.replace( + spec, + "%%INPUT_FORMAT_TYPE%%", + InputFormatDetails.JSON.getInputFormatType() + ); + spec = StringUtils.replace( + spec, + "%%PARTITIONS_SPEC%%", + jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, null)) + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_TYPE%%", + "s3" + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_PROPERTY_KEY%%", + s3InputSource.lhs + ); + return StringUtils.replace( + spec, + "%%INPUT_SOURCE_PROPERTY_VALUE%%", + inputSourceValue + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + + doIndexTest( + indexDatasource, + INDEX_TASK, + s3PropsTransform, + INDEX_QUERIES_RESOURCE, + false, + true, + true, + segmentAvailabilityConfirmationPair + ); + } + } + + public void doMSQTest(Pair<String, List> s3InputSource, Review Comment: Here and elsewhere. Would be great to include a Javadoc comment explaining what this does. Yes, we can figure it out. But, since you wrote it, you could save us time by explaining what your intent is for this function. That is, does it just run? Does it run and check the results? Where are the input files? What are the parameters we should put into those files? Where do the values come from? Some of this can be at the to of the test. If it is shared across a suite of tests, please put it in the base class. Explaining here is best. There should also be some basic info in the doc files, which can point here for details. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.utils.S3TestUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +import static junit.framework.Assert.fail; + + +public abstract class AbstractS3InputSourceParallelIndexTest extends AbstractITBatchIndexTest +{ + private static String indexDatasource; + private static S3TestUtil s3; + private static final String INDEX_TASK = "/indexer/wikipedia_cloud_index_task.json"; + private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] resources() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static List<String> fileList() + { + return Arrays.asList(WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3); + } + + @BeforeClass + public static void uploadDataFilesToS3() + { + List<String> filesToUpload = new ArrayList<>(); + String localPath = "resources/data/batch_index/json/"; + for (String file : fileList()) { + filesToUpload.add(localPath + file); + } + try { + s3 = new S3TestUtil(); + s3.uploadDataFilesToS3(filesToUpload); + } + catch (Exception e) { + // Fail if exception + fail(); + } + } + + @After + public void deleteSegmentsFromS3() + { + // Deleting folder created for storing segments (by druid) after test is completed + s3.deleteFolderFromS3(indexDatasource); + } + + @AfterClass + public static void deleteDataFilesFromS3() + { + // Deleting uploaded data files + s3.deleteFilesFromS3(fileList()); + } + + void doTest( + Pair<String, List> s3InputSource, + Pair<Boolean, Boolean> segmentAvailabilityConfirmationPair + ) throws Exception + { + indexDatasource = "wikipedia_index_test_" + UUID.randomUUID(); + try ( + final Closeable ignored1 = unloader(indexDatasource + config.getExtraDatasourceNameSuffix()); + ) { + final Function<String, String> s3PropsTransform = spec -> { + try { + String inputSourceValue = jsonMapper.writeValueAsString(s3InputSource.rhs); + inputSourceValue = StringUtils.replace( + inputSourceValue, + "%%BUCKET%%", + config.getCloudBucket() + ); + inputSourceValue = StringUtils.replace( + inputSourceValue, + "%%PATH%%", + config.getCloudPath() + ); + spec = StringUtils.replace( + spec, + "%%INPUT_FORMAT_TYPE%%", + InputFormatDetails.JSON.getInputFormatType() + ); + spec = StringUtils.replace( + spec, + "%%PARTITIONS_SPEC%%", + jsonMapper.writeValueAsString(new DynamicPartitionsSpec(null, null)) + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_TYPE%%", + "s3" + ); + spec = StringUtils.replace( + spec, + "%%INPUT_SOURCE_PROPERTY_KEY%%", + s3InputSource.lhs + ); + return StringUtils.replace( + spec, + "%%INPUT_SOURCE_PROPERTY_VALUE%%", + inputSourceValue + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + }; + + doIndexTest( + indexDatasource, + INDEX_TASK, + s3PropsTransform, + INDEX_QUERIES_RESOURCE, + false, + true, + true, + segmentAvailabilityConfirmationPair + ); + } + } + + public void doMSQTest(Pair<String, List> s3InputSource, + String IngestSQLFilePath, + String TestQueriesFilePath) Review Comment: Nit: method signature format. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/AbstractS3InputSourceParallelIndexTest.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testing.utils.S3TestUtil; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; + +import static junit.framework.Assert.fail; + + +public abstract class AbstractS3InputSourceParallelIndexTest extends AbstractITBatchIndexTest Review Comment: A Javadoc comment would be helpful to explain what this does. The original didn't have one, but now that you've figured it out, would be good to explain the purpose to the rest of us. ########## integration-tests/src/main/java/org/apache/druid/testing/utils/S3TestUtil.java: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.utils; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.druid.java.util.common.logger.Logger; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class S3TestUtil Review Comment: I agree. @cryptoe, you created such an API for Druid's internal use. Can we reuse that here? Create a test-specific wrapper? The idea state would be that there is an `ObjectStoreClient` interface which provides the upload, delete, etc. methods. @cryptoe, can you help @abhagraw define such a test client? Doing so goes a bit beyond the "convert the old IT" task and would benefit from some dev help. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.apache.druid.testsEx.indexer.AbstractS3InputSourceParallelIndexTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.List; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET - s3 Bucket to store in (value to be set in druid.storage.bucket) + * DRUID_CLOUD_PATH - path inside the bucket where the test data files will be uploaded + * (this will also be used as druid.storage.baseKey for s3 deep storage setup) + * <p> + * The AWS key, secret and region should be set in + * AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION respectively. + * <p> + * <a href="https://druid.apache.org/docs/latest/development/extensions-core/s3.html">S3 Deep Storage setup in druid</a> Review Comment: This comment is duplicated, which is perhaps OK. The commonality is the `S3DeepStorage` test category. Perhaps it makes sense to move the documentation there for all tests that run with that configuration? ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.apache.druid.testsEx.indexer.AbstractS3InputSourceParallelIndexTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.List; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET - s3 Bucket to store in (value to be set in druid.storage.bucket) + * DRUID_CLOUD_PATH - path inside the bucket where the test data files will be uploaded + * (this will also be used as druid.storage.baseKey for s3 deep storage setup) + * <p> + * The AWS key, secret and region should be set in + * AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION respectively. + * <p> + * <a href="https://druid.apache.org/docs/latest/development/extensions-core/s3.html">S3 Deep Storage setup in druid</a> + */ + +@RunWith(DruidTestRunner.class) +@Category(S3DeepStorage.class) +public class ITS3SQLBasedIngestionTest extends AbstractS3InputSourceParallelIndexTest Review Comment: As noted earlier, a brief comment would be helpful to us reviewers. I gather this is the MSQ ingestion test, but using S3 as deep storage? Also as the source of the files? ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.apache.druid.testsEx.indexer.AbstractS3InputSourceParallelIndexTest; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.List; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET - s3 Bucket to store in (value to be set in druid.storage.bucket) + * DRUID_CLOUD_PATH - path inside the bucket where the test data files will be uploaded + * (this will also be used as druid.storage.baseKey for s3 deep storage setup) + * <p> + * The AWS key, secret and region should be set in + * AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY and AWS_REGION respectively. + * <p> + * <a href="https://druid.apache.org/docs/latest/development/extensions-core/s3.html">S3 Deep Storage setup in druid</a> + */ + +@RunWith(DruidTestRunner.class) +@Category(S3DeepStorage.class) +public class ITS3SQLBasedIngestionTest extends AbstractS3InputSourceParallelIndexTest +{ + @Inject + @Json + protected ObjectMapper jsonMapper; + private static final String CLOUD_INGEST_SQL = "/multi-stage-query/wikipedia_cloud_index_msq.sql"; + private static final String INDEX_QUERIES_FILE = "/multi-stage-query/wikipedia_index_queries.json"; Review Comment: The typical Java standard is to put constants at the top of the file, followed by variables. So, move `jsonMapper` after these two constants, with a blank line between them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org