paul-rogers commented on code in PR #13535: URL: https://github.com/apache/druid/pull/13535#discussion_r1047483660
########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITS3ToS3ParallelIndexTest.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import junitparams.Parameters; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.List; + +/** + * IMPORTANT: + * To run this test, you must: + * 1) Set the bucket and path for your data. This can be done by setting -Ddruid.test.config.cloudBucket and + * -Ddruid.test.config.cloudPath or setting "cloud_bucket" and "cloud_path" in the config file. + * 2) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json + * located in integration-tests/src/test/resources/data/batch_index/json to your S3 at the location set in step 1. + * 3) Provide -Doverride.config.path=<PATH_TO_FILE> with s3 credentials/configs set. See Review Comment: I'm not sure if the new ITs implement this feature. If it did, we have no good way in the new ITs to pass this information into the test, as we use the same Java command line for all tests. In the new IT framework, we prefer to use env vars, not command line options. I wonder, are all these comments just copy/pasted from old tests without adjusting them for the new IT framework? If so, we're doing future readers a dis-service by providing incorrect info. If we don't have time to provide correct comments, just remove the comments and let the developer rediscover how things work without being sent down the wrong path. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITS3ToS3ParallelIndexTest.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import junitparams.Parameters; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.List; + +/** + * IMPORTANT: + * To run this test, you must: + * 1) Set the bucket and path for your data. This can be done by setting -Ddruid.test.config.cloudBucket and Review Comment: Is this comment accurate? We discussed setting the env var directly. For the above to work, something would have to pass the flags to the test process, but the new ITs are not set up to do that. ########## integration-tests-ex/cases/cluster/S3DeepStorage/docker-compose.yaml: ########## @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +networks: + druid-it-net: + name: druid-it-net + ipam: + config: + - subnet: 172.172.172.0/24 + +services: + zookeeper: + extends: + file: ../Common/dependencies.yaml + service: zookeeper + + metadata: + extends: + file: ../Common/dependencies.yaml + service: metadata + + coordinator: + extends: + file: ../Common/druid.yaml + service: coordinator + container_name: coordinator + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + # The frequency with which the coordinator polls the database + # for changes. The DB population code has to wait at least this + # long for the coordinator to notice changes. + - druid_manager_segments_pollDuration=PT5S + - druid_coordinator_period=PT10S + - AWS_REGION=${AWS_REGION} + - druid_s3_accessKey=${AWS_ACCESS_KEY_ID} + - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY} + depends_on: + - zookeeper + - metadata + + overlord: + extends: + file: ../Common/druid.yaml + service: overlord + container_name: overlord + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + - AWS_REGION=${AWS_REGION} + - druid_s3_accessKey=${AWS_ACCESS_KEY_ID} + - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY} + depends_on: + - zookeeper + - metadata + + broker: + extends: + file: ../Common/druid.yaml + service: broker + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + - AWS_REGION=${AWS_REGION} + - druid_s3_accessKey=${AWS_ACCESS_KEY_ID} + - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY} + depends_on: + - zookeeper + + router: + extends: + file: ../Common/druid.yaml + service: router + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + - AWS_REGION=${AWS_REGION} + - druid_s3_accessKey=${AWS_ACCESS_KEY_ID} + - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY} + depends_on: + - zookeeper + + historical: + extends: + file: ../Common/druid.yaml + service: historical + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + - AWS_REGION=${AWS_REGION} + - druid_s3_accessKey=${AWS_ACCESS_KEY_ID} + - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY} + depends_on: + - zookeeper + + indexer: + extends: + file: ../Common/druid.yaml + service: indexer + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + - druid_test_loadList=druid-s3-extensions + - druid_storage_type=s3 + - druid_storage_bucket=${DRUID_CLOUD_BUCKET} + # Using DRUID_CLOUD_PATH env as baseKey as well. + - druid_storage_baseKey=${DRUID_CLOUD_PATH} + - druid_s3_accessKey=${AWS_ACCESS_KEY_ID} + - druid_s3_secretKey=${AWS_SECRET_ACCESS_KEY} Review Comment: Let's document the required env vars somewhere. Perhaps we add a Markdown page that lists the requirements for each test. Or, we document these at the top of the Java file. Be sure to list the env vars used by the server vs. those used by the test. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/indexer/ITS3ToS3ParallelIndexTest.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.indexer; + +import junitparams.Parameters; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.List; + +/** + * IMPORTANT: + * To run this test, you must: + * 1) Set the bucket and path for your data. This can be done by setting -Ddruid.test.config.cloudBucket and + * -Ddruid.test.config.cloudPath or setting "cloud_bucket" and "cloud_path" in the config file. + * 2) Copy wikipedia_index_data1.json, wikipedia_index_data2.json, and wikipedia_index_data3.json + * located in integration-tests/src/test/resources/data/batch_index/json to your S3 at the location set in step 1. Review Comment: A comment somewhere said that the test itself is doing this step. If this test were run as part of a build, we'd have to insert this step into the script which launches the test. Did we do so? ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION + */ + +@RunWith(DruidTestRunner.class) +@Category(S3DeepStorage.class) +public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion +{ + @Inject + @Json + protected ObjectMapper jsonMapper; + private static AmazonS3 s3Client = s3Client(); + private static String datasource = "wikipedia_cloud_index_msq"; + private static final String CLOUD_INGEST_SQL = "/multi-stage-query/wikipedia_cloud_index_msq.sql"; + private static final String INDEX_QUERIES_FILE = "/multi-stage-query/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] test_cases() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static AmazonS3 s3Client() + { + AWSCredentials credentials = new BasicAWSCredentials( + System.getenv("AWS_ACCESS_KEY_ID"), + System.getenv("AWS_SECRET_ACCESS_KEY") Review Comment: A common error will be to leave one or both of these unset. Even though this is a test, and we want to go fast, future users will appreciate it if we check if these values are set and give a good error if not set. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION + */ + +@RunWith(DruidTestRunner.class) +@Category(S3DeepStorage.class) +public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion +{ + @Inject + @Json + protected ObjectMapper jsonMapper; + private static AmazonS3 s3Client = s3Client(); + private static String datasource = "wikipedia_cloud_index_msq"; + private static final String CLOUD_INGEST_SQL = "/multi-stage-query/wikipedia_cloud_index_msq.sql"; + private static final String INDEX_QUERIES_FILE = "/multi-stage-query/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] test_cases() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static AmazonS3 s3Client() + { + AWSCredentials credentials = new BasicAWSCredentials( + System.getenv("AWS_ACCESS_KEY_ID"), + System.getenv("AWS_SECRET_ACCESS_KEY") + ); + return AmazonS3ClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withRegion(System.getenv("AWS_REGION")) Review Comment: Again, check the value and given an error. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION + */ + +@RunWith(DruidTestRunner.class) +@Category(S3DeepStorage.class) +public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion +{ + @Inject + @Json + protected ObjectMapper jsonMapper; + private static AmazonS3 s3Client = s3Client(); + private static String datasource = "wikipedia_cloud_index_msq"; + private static final String CLOUD_INGEST_SQL = "/multi-stage-query/wikipedia_cloud_index_msq.sql"; + private static final String INDEX_QUERIES_FILE = "/multi-stage-query/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] test_cases() Review Comment: This seems a clone of the `resources` in `AbstractS3InputSourceParallelIndexTest`. Even though we want to move fast, it is still helpful to follow good practices and factor out common code. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION + */ + +@RunWith(DruidTestRunner.class) +@Category(S3DeepStorage.class) +public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion +{ + @Inject + @Json + protected ObjectMapper jsonMapper; + private static AmazonS3 s3Client = s3Client(); + private static String datasource = "wikipedia_cloud_index_msq"; + private static final String CLOUD_INGEST_SQL = "/multi-stage-query/wikipedia_cloud_index_msq.sql"; + private static final String INDEX_QUERIES_FILE = "/multi-stage-query/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] test_cases() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static AmazonS3 s3Client() + { + AWSCredentials credentials = new BasicAWSCredentials( + System.getenv("AWS_ACCESS_KEY_ID"), + System.getenv("AWS_SECRET_ACCESS_KEY") + ); + return AmazonS3ClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withRegion(System.getenv("AWS_REGION")) + .build(); + } + + public static String[] fileList() + { + return new String[] { + WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3 + }; + } + + @BeforeClass + public static void uploadDataFilesToS3() + { + String localPath = "resources/data/batch_index/json/"; + for (String file : fileList()) { + s3Client.putObject( + System.getenv("DRUID_CLOUD_BUCKET"), + System.getenv("DRUID_CLOUD_PATH") + "/" + file, + new File(localPath + file) + ); + } + } + + @AfterClass + public static void deleteFilesFromS3() + { + // Delete uploaded data files + DeleteObjectsRequest delObjReq = new DeleteObjectsRequest(System.getenv("DRUID_CLOUD_BUCKET")) + .withKeys(fileList()); + s3Client.deleteObjects(delObjReq); + + // Delete segments created by druid + ListObjectsRequest listObjectsRequest = new ListObjectsRequest() + .withBucketName(System.getenv("DRUID_CLOUD_BUCKET")) + .withPrefix(System.getenv("DRUID_CLOUD_PATH") + "/" + datasource + "/"); Review Comment: See comments above. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION + */ + +@RunWith(DruidTestRunner.class) +@Category(S3DeepStorage.class) +public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion Review Comment: Some explanation would be helpful. I see code to upload and delete files. There seems to be a step to run an MSQ query. Not sure if there is a step to validate the results. Please add a note to explain what this test does. I, as a reviewer, don't want to have to reverse-engineer the intent from the code. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION + */ + +@RunWith(DruidTestRunner.class) +@Category(S3DeepStorage.class) +public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion +{ + @Inject + @Json + protected ObjectMapper jsonMapper; + private static AmazonS3 s3Client = s3Client(); + private static String datasource = "wikipedia_cloud_index_msq"; + private static final String CLOUD_INGEST_SQL = "/multi-stage-query/wikipedia_cloud_index_msq.sql"; + private static final String INDEX_QUERIES_FILE = "/multi-stage-query/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] test_cases() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static AmazonS3 s3Client() + { + AWSCredentials credentials = new BasicAWSCredentials( + System.getenv("AWS_ACCESS_KEY_ID"), + System.getenv("AWS_SECRET_ACCESS_KEY") + ); + return AmazonS3ClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withRegion(System.getenv("AWS_REGION")) + .build(); + } + + public static String[] fileList() + { + return new String[] { + WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3 + }; + } + + @BeforeClass + public static void uploadDataFilesToS3() + { + String localPath = "resources/data/batch_index/json/"; + for (String file : fileList()) { + s3Client.putObject( + System.getenv("DRUID_CLOUD_BUCKET"), + System.getenv("DRUID_CLOUD_PATH") + "/" + file, + new File(localPath + file) + ); + } + } + + @AfterClass + public static void deleteFilesFromS3() + { + // Delete uploaded data files + DeleteObjectsRequest delObjReq = new DeleteObjectsRequest(System.getenv("DRUID_CLOUD_BUCKET")) + .withKeys(fileList()); + s3Client.deleteObjects(delObjReq); + + // Delete segments created by druid + ListObjectsRequest listObjectsRequest = new ListObjectsRequest() + .withBucketName(System.getenv("DRUID_CLOUD_BUCKET")) + .withPrefix(System.getenv("DRUID_CLOUD_PATH") + "/" + datasource + "/"); + + ObjectListing objectListing = s3Client.listObjects(listObjectsRequest); + + while (true) { + for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { + s3Client.deleteObject(System.getenv("DRUID_CLOUD_BUCKET"), objectSummary.getKey()); + } + if (objectListing.isTruncated()) { + objectListing = s3Client.listNextBatchOfObjects(objectListing); + } else { + break; + } + } + } + + @Test + @Parameters(method = "test_cases") + @TestCaseName("Test_{index} ({0})") + public void testSQLBasedBatchIngestion(Pair<String, List> s3InputSource) + { + try { + String sqlTask = getStringFromFileAndReplaceDatasource(CLOUD_INGEST_SQL, datasource); + String inputSourceValue = jsonMapper.writeValueAsString(s3InputSource.rhs); + Map<String, Object> context = ImmutableMap.of("finalizeAggregations", false, + "maxNumTasks", 5, + "groupByEnableMultiValueUnnesting", false); + + sqlTask = StringUtils.replace( + sqlTask, + "%%INPUT_SOURCE_PROPERTY_KEY%%", + s3InputSource.lhs + ); + sqlTask = StringUtils.replace( + sqlTask, + "%%INPUT_SOURCE_PROPERTY_VALUE%%", + inputSourceValue + ); + + // Setting the correct object path in the sqlTask. + sqlTask = StringUtils.replace( + sqlTask, + "%%BUCKET%%", + config.getCloudBucket() // Getting from DRUID_CLOUD_BUCKET env variable + ); + sqlTask = StringUtils.replace( + sqlTask, + "%%PATH%%", + config.getCloudPath() // Getting from DRUID_CLOUD_PATH env variable + ); + + submitTask(sqlTask, datasource, context); + doTestQuery(INDEX_QUERIES_FILE, datasource); Review Comment: Does this verify the results by running queries against the uploaded data? If so, please add a comment to note this. If this does not verify results, then we need to add a step to do so. Else, this is not much of a test since a simple direct return from `submitTask()` (while doing nothing) would count as a success. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION + */ + +@RunWith(DruidTestRunner.class) +@Category(S3DeepStorage.class) +public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion +{ + @Inject + @Json + protected ObjectMapper jsonMapper; + private static AmazonS3 s3Client = s3Client(); + private static String datasource = "wikipedia_cloud_index_msq"; + private static final String CLOUD_INGEST_SQL = "/multi-stage-query/wikipedia_cloud_index_msq.sql"; + private static final String INDEX_QUERIES_FILE = "/multi-stage-query/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] test_cases() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static AmazonS3 s3Client() + { + AWSCredentials credentials = new BasicAWSCredentials( + System.getenv("AWS_ACCESS_KEY_ID"), + System.getenv("AWS_SECRET_ACCESS_KEY") + ); + return AmazonS3ClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withRegion(System.getenv("AWS_REGION")) + .build(); + } + + public static String[] fileList() + { + return new String[] { + WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3 + }; + } + + @BeforeClass + public static void uploadDataFilesToS3() Review Comment: This function has no try-catch. If one upload fails, we don't delete the files already uploaded. This will cause the next test to fail. It is helpful to not only get code done quickly, but to also get it done well to prevent cascading errors. So, please provide try/catch blocks that ensure that files are cleaned up even if something fails. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION + */ + +@RunWith(DruidTestRunner.class) +@Category(S3DeepStorage.class) +public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion +{ + @Inject + @Json + protected ObjectMapper jsonMapper; + private static AmazonS3 s3Client = s3Client(); + private static String datasource = "wikipedia_cloud_index_msq"; + private static final String CLOUD_INGEST_SQL = "/multi-stage-query/wikipedia_cloud_index_msq.sql"; + private static final String INDEX_QUERIES_FILE = "/multi-stage-query/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] test_cases() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static AmazonS3 s3Client() + { + AWSCredentials credentials = new BasicAWSCredentials( + System.getenv("AWS_ACCESS_KEY_ID"), + System.getenv("AWS_SECRET_ACCESS_KEY") + ); + return AmazonS3ClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withRegion(System.getenv("AWS_REGION")) + .build(); + } + + public static String[] fileList() + { + return new String[] { + WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3 + }; + } + + @BeforeClass + public static void uploadDataFilesToS3() + { + String localPath = "resources/data/batch_index/json/"; + for (String file : fileList()) { + s3Client.putObject( + System.getenv("DRUID_CLOUD_BUCKET"), + System.getenv("DRUID_CLOUD_PATH") + "/" + file, Review Comment: Please check that the env var is set. This code appears to be a quick hack to get things done ASAP, but without normal coding practices. This code bypasses the mechanism that exists for this Druid-specific value. It does not check if the value is set. If fetches the env var for every object rather than fetching once and reusing the value (which is what the existing code does.) I'm hesitant to approve code that is overly quick and dirty. ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION + */ + +@RunWith(DruidTestRunner.class) +@Category(S3DeepStorage.class) +public class ITS3SQLBasedIngestionTest extends AbstractITSQLBasedIngestion +{ + @Inject + @Json + protected ObjectMapper jsonMapper; + private static AmazonS3 s3Client = s3Client(); + private static String datasource = "wikipedia_cloud_index_msq"; + private static final String CLOUD_INGEST_SQL = "/multi-stage-query/wikipedia_cloud_index_msq.sql"; + private static final String INDEX_QUERIES_FILE = "/multi-stage-query/wikipedia_index_queries.json"; + private static final String INPUT_SOURCE_URIS_KEY = "uris"; + private static final String INPUT_SOURCE_PREFIXES_KEY = "prefixes"; + private static final String INPUT_SOURCE_OBJECTS_KEY = "objects"; + private static final String WIKIPEDIA_DATA_1 = "wikipedia_index_data1.json"; + private static final String WIKIPEDIA_DATA_2 = "wikipedia_index_data2.json"; + private static final String WIKIPEDIA_DATA_3 = "wikipedia_index_data3.json"; + + public static Object[][] test_cases() + { + return new Object[][]{ + {new Pair<>(INPUT_SOURCE_URIS_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_1, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_2, + "s3://%%BUCKET%%/%%PATH%%/" + WIKIPEDIA_DATA_3 + ) + )}, + {new Pair<>(INPUT_SOURCE_PREFIXES_KEY, + ImmutableList.of( + "s3://%%BUCKET%%/%%PATH%%/" + ) + )}, + {new Pair<>(INPUT_SOURCE_OBJECTS_KEY, + ImmutableList.of( + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_1), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_2), + ImmutableMap.of("bucket", "%%BUCKET%%", "path", "%%PATH%%/" + WIKIPEDIA_DATA_3) + ) + )} + }; + } + + public static AmazonS3 s3Client() + { + AWSCredentials credentials = new BasicAWSCredentials( + System.getenv("AWS_ACCESS_KEY_ID"), + System.getenv("AWS_SECRET_ACCESS_KEY") + ); + return AmazonS3ClientBuilder + .standard() + .withCredentials(new AWSStaticCredentialsProvider(credentials)) + .withRegion(System.getenv("AWS_REGION")) + .build(); + } + + public static String[] fileList() + { + return new String[] { + WIKIPEDIA_DATA_1, WIKIPEDIA_DATA_2, WIKIPEDIA_DATA_3 + }; + } + + @BeforeClass + public static void uploadDataFilesToS3() + { + String localPath = "resources/data/batch_index/json/"; + for (String file : fileList()) { + s3Client.putObject( + System.getenv("DRUID_CLOUD_BUCKET"), + System.getenv("DRUID_CLOUD_PATH") + "/" + file, + new File(localPath + file) + ); + } + } + + @AfterClass + public static void deleteFilesFromS3() + { + // Delete uploaded data files + DeleteObjectsRequest delObjReq = new DeleteObjectsRequest(System.getenv("DRUID_CLOUD_BUCKET")) + .withKeys(fileList()); + s3Client.deleteObjects(delObjReq); + + // Delete segments created by druid + ListObjectsRequest listObjectsRequest = new ListObjectsRequest() + .withBucketName(System.getenv("DRUID_CLOUD_BUCKET")) + .withPrefix(System.getenv("DRUID_CLOUD_PATH") + "/" + datasource + "/"); + + ObjectListing objectListing = s3Client.listObjects(listObjectsRequest); + + while (true) { + for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { + s3Client.deleteObject(System.getenv("DRUID_CLOUD_BUCKET"), objectSummary.getKey()); + } + if (objectListing.isTruncated()) { + objectListing = s3Client.listNextBatchOfObjects(objectListing); + } else { + break; + } + } + } + + @Test + @Parameters(method = "test_cases") + @TestCaseName("Test_{index} ({0})") + public void testSQLBasedBatchIngestion(Pair<String, List> s3InputSource) + { + try { + String sqlTask = getStringFromFileAndReplaceDatasource(CLOUD_INGEST_SQL, datasource); + String inputSourceValue = jsonMapper.writeValueAsString(s3InputSource.rhs); + Map<String, Object> context = ImmutableMap.of("finalizeAggregations", false, + "maxNumTasks", 5, + "groupByEnableMultiValueUnnesting", false); + + sqlTask = StringUtils.replace( + sqlTask, + "%%INPUT_SOURCE_PROPERTY_KEY%%", + s3InputSource.lhs + ); + sqlTask = StringUtils.replace( + sqlTask, + "%%INPUT_SOURCE_PROPERTY_VALUE%%", + inputSourceValue + ); + + // Setting the correct object path in the sqlTask. + sqlTask = StringUtils.replace( + sqlTask, + "%%BUCKET%%", + config.getCloudBucket() // Getting from DRUID_CLOUD_BUCKET env variable + ); + sqlTask = StringUtils.replace( + sqlTask, + "%%PATH%%", + config.getCloudPath() // Getting from DRUID_CLOUD_PATH env variable + ); + + submitTask(sqlTask, datasource, context); Review Comment: Does this submit the MSQ ingestion task? If so, please add a comment to note this. For MSQ, we have to submit a task to a specific endpoint, poll to wait for completion, and check completion status. Do we do that somewhere? ########## integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_cloud_index_msq.sql: ########## @@ -0,0 +1,32 @@ +REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL +WITH "source" as (SELECT * FROM TABLE( + EXTERN( + '{"type":"s3","%%INPUT_SOURCE_PROPERTY_KEY%%":%%INPUT_SOURCE_PROPERTY_VALUE%%}', + '{"type":"json"}', + '[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]' + ) +)) +SELECT + TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time, + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city", + COUNT(*) AS "count", + SUM("added") AS "added", + SUM("deleted") AS "deleted", + SUM("delta") AS "delta", + APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch", + DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch", + APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild" +FROM "source" +GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 +PARTITIONED BY DAY Review Comment: Missing newline ########## integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITS3SQLBasedIngestionTest.java: ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import junitparams.Parameters; +import junitparams.naming.TestCaseName; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.testsEx.categories.S3DeepStorage; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * IMPORTANT: + * To run this test, you must set the following env variables in the build environment + * DRUID_CLOUD_BUCKET, DRUID_CLOUD_PATH, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION Review Comment: Please explain what these should be set to. `DRUID_CLOUD_BUCKET`: The full URL or just a name? `DRUID_CLOUD_PATH`: Same question. What is the relationship between this item and the above. `AWS_*` these are probably standard AWS settings? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
