This is an automated email from the ASF dual-hosted git repository. htowaileb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 1a28c658810ded5d50ff88f42ef88ec8f82926e1 Author: Hussain Towaileb <[email protected]> AuthorDate: Fri May 15 12:50:07 2020 +0300 [ASTERIXDB-2736][EXT] Ensure retrieving all objects if an S3 bucket has more than 1000 objects - user model changes: no - storage format changes: no - interface changes: no Details: - Fixed the AWS reader to handle reading more than 1000 objects coming in a single request. - Added a test case for the above mentioned item. Change-Id: Ic7891aa86852e07dfad9ce41de908b34f86bdb42 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6344 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> --- .../aws/AwsS3ExternalDatasetTest.java | 18 +++++++++++ .../over-1000-objects.000.ddl.sqlpp | 37 ++++++++++++++++++++++ .../over-1000-objects.001.query.sqlpp | 22 +++++++++++++ .../over-1000-objects.099.ddl.sqlpp | 20 ++++++++++++ .../s3/over-1000-objects/over-1000-objects.001.adm | 1 + .../runtimets/testsuite_external_dataset.xml | 5 +++ .../record/reader/aws/AwsS3InputStreamFactory.java | 32 ++++++++++++++++--- 7 files changed, 130 insertions(+), 5 deletions(-) diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java index 55c78e3..23bf3bd 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java @@ -103,6 +103,10 @@ public class AwsS3ExternalDatasetTest { private static final String S3_MOCK_SERVER_BUCKET_CSV_DEFINITION = "csv-data/reviews/"; // data resides here private static final String S3_MOCK_SERVER_BUCKET_TSV_DEFINITION = "tsv-data/reviews/"; // data resides here + // This is used for a test to generate over 1000 number of files + private static final String OVER_1000_OBJECTS_PATH = "over-1000-objects"; + private static final int OVER_1000_OBJECTS_COUNT = 2999; + private static final Set<String> fileNames = new HashSet<>(); private static final CreateBucketRequest.Builder CREATE_BUCKET_BUILDER = CreateBucketRequest.builder(); private static final DeleteBucketRequest.Builder DELETE_BUCKET_BUILDER = DeleteBucketRequest.builder(); @@ -213,6 +217,10 @@ public class AwsS3ExternalDatasetTest { LOGGER.info("Adding TSV files to the bucket"); loadTsvFiles(); LOGGER.info("TSV Files added successfully"); + + LOGGER.info("Loading " + OVER_1000_OBJECTS_COUNT + " into " + OVER_1000_OBJECTS_PATH); + loadLargeNumberOfFiles(); + LOGGER.info("Added " + OVER_1000_OBJECTS_COUNT + " files into " + OVER_1000_OBJECTS_PATH + " successfully"); } /** @@ -381,6 +389,16 @@ public class AwsS3ExternalDatasetTest { } } + /** + * Generates over 1000 objects and upload them to S3 mock server, 1 record per object + */ + private static void loadLargeNumberOfFiles() { + for (int i = 0; i < OVER_1000_OBJECTS_COUNT; i++) { + RequestBody body = RequestBody.fromString("{\"id\":" + i + "}"); + client.putObject(builder.key(OVER_1000_OBJECTS_PATH + "/" + i + ".json").build(), body); + } + } + static class AwsTestExecutor extends TestExecutor { public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.000.ddl.sqlpp new file mode 100644 index 0000000..a26caeb --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.000.ddl.sqlpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; +create dataverse test; +use test; + +drop type test if exists; +create type test as open { +}; + +drop dataset test if exists; +create external dataset test(test) using S3 ( +("accessKeyId"="dummyAccessKey"), +("secretAccessKey"="dummySecretKey"), +("region"="us-west-2"), +("serviceEndpoint"="http://localhost:8001"), +("container"="playground"), +("definition"="over-1000-objects"), +("format"="json") +); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.query.sqlpp new file mode 100644 index 0000000..affdb87 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +select count(*) `count` from test; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.099.ddl.sqlpp new file mode 100644 index 0000000..548e632 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/over-1000-objects/over-1000-objects.099.ddl.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.adm new file mode 100644 index 0000000..b610b1d --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/over-1000-objects/over-1000-objects.001.adm @@ -0,0 +1 @@ +{ "count": 2999 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml index 551d777..5aa1326 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml @@ -67,5 +67,10 @@ <output-dir compare="Text">aws/s3/empty-string-definition</output-dir> </compilation-unit> </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="aws/s3/over-1000-objects"> + <output-dir compare="Text">aws/s3/over-1000-objects</output-dir> + </compilation-unit> + </test-case> </test-group> </test-suite> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java index 58a77b1..9158a57 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java @@ -42,8 +42,8 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; -import software.amazon.awssdk.services.s3.model.ListObjectsRequest; -import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; public class AwsS3InputStreamFactory implements IInputStreamFactory { @@ -86,13 +86,35 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory { S3Client s3Client = buildAwsS3Client(configuration); // Get all objects in a bucket and extract the paths to files - ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container); + ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container); String path = configuration.get(AwsS3Constants.DEFINITION_FIELD_NAME); if (path != null) { listObjectsBuilder.prefix(path + (!path.isEmpty() && !path.endsWith("/") ? "/" : "")); } - ListObjectsResponse listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build()); - List<S3Object> s3Objects = listObjectsResponse.contents(); + + ListObjectsV2Response listObjectsResponse; + List<S3Object> s3Objects = new ArrayList<>(); + boolean done = false; + String newMarker = null; + + while (!done) { + // List the objects from the start, or from the last marker in case of truncated result + if (newMarker == null) { + listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build()); + } else { + listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build()); + } + + // Collect all the provided objects + s3Objects.addAll(listObjectsResponse.contents()); + + // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request + if (!listObjectsResponse.isTruncated()) { + done = true; + } else { + newMarker = listObjectsResponse.nextContinuationToken(); + } + } // Exclude the directories and get the files only String fileFormat = configuration.get(ExternalDataConstants.KEY_FORMAT);
