This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 7707f1f02f59293e1aa3a899846cd54c676d02a8 Merge: be592d9 0381a66 Author: Hussain Towaileb <[email protected]> AuthorDate: Tue Feb 9 15:17:02 2021 +0300 Merge commit 'gerrit/mad-hatter' Change-Id: I0a1bf2f476c13d1a7dfc7a1ffc13858367ed870a .../apache/asterix/test/txn/LogManagerTest.java | 2 +- .../asterix/test/txn/RecoveryManagerTest.java | 18 +++ .../non-s3-region/external_dataset.000.ddl.sqlpp | 41 +++++++ .../non-s3-region/external_dataset.099.ddl.sqlpp | 20 ++++ .../runtimets/testsuite_external_dataset_s3.xml | 5 + asterixdb/asterix-external-data/pom.xml | 4 + .../record/reader/aws/AwsS3InputStreamFactory.java | 125 ++++++++++++++++----- .../external/util/ExternalDataConstants.java | 3 +- .../asterix/external/util/ExternalDataUtils.java | 97 ++++++++++------ .../management/service/logging/LogReader.java | 2 +- asterixdb/pom.xml | 5 + 11 files changed, 256 insertions(+), 66 deletions(-) diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java index 59d7fae,34e48c2..4f8df3d --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/RecoveryManagerTest.java @@@ -22,7 -22,10 +22,9 @@@ import java.io.File import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; import org.apache.asterix.common.TestDataUtil; + import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities; + import org.apache.asterix.transaction.management.service.logging.LogManager; import org.junit.After; import org.junit.Assert; import org.junit.Before; diff --cc asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/non-s3-region/external_dataset.000.ddl.sqlpp index 0000000,3b22f11..3b22f11 mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/non-s3-region/external_dataset.000.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/non-s3-region/external_dataset.000.ddl.sqlpp diff --cc asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/non-s3-region/external_dataset.099.ddl.sqlpp index 0000000,548e632..548e632 mode 000000,100644..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/non-s3-region/external_dataset.099.ddl.sqlpp +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/s3/non-s3-region/external_dataset.099.ddl.sqlpp diff --cc asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index e0a28ba,0000000..6557230 mode 100644,000000..100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@@ -1,304 -1,0 +1,309 @@@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> +<test-suite xmlns="urn:xml.testframework.asterix.apache.org" ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp" QueryFileExtension=".sqlpp"> + <test-group name="aws-s3-external-dataset"> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/json/json"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/json/json</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset/s3"> + <compilation-unit name="create-with-session-token"> + <output-dir compare="Text">create-with-session-token</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/json/gz"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/json/gz</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/json/mixed"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/json/mixed</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/csv/csv"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/csv/csv</output-dir> + </compilation-unit> + </test-case><test-case FilePath="external-dataset"> + <compilation-unit name="common/csv/gz"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/csv/gz</output-dir> + </compilation-unit> + </test-case><test-case FilePath="external-dataset"> + <compilation-unit name="common/csv/mixed"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/csv/mixed</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/tsv/tsv"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/tsv/tsv</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/tsv/gz"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/tsv/gz</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/tsv/mixed"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/tsv/mixed</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/empty-string-definition"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/empty-string-definition</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/over-1000-objects"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/over-1000-objects</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/malformed-json"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/malformed-json</output-dir> + <expected-error>Parsing error at malformed-data/duplicate-fields.json line 1 field field: Duplicate field 'field'</expected-error> + <expected-error>Parsing error at malformed-data/malformed-json.json line 1 field field: Unexpected character ('}' (code 125)): was expecting double-quote to start field name</expected-error> + <expected-error>Parsing error at malformed-data/malformed-json-2.json line 4 field array_f: Unexpected character (']' (code 93)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error> + <expected-error>Parsing error at malformed-data/malformed-jsonl-1.json line 3 field field2: Unrecognized token 'truee': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error> + <expected-error>Parsing error at malformed-data/malformed-jsonl-2.json line 11 field array_f: Unexpected character (']' (code 93)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/definition-does-not-exist"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/definition-does-not-exist</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/invalid-endpoint"> + <placeholder name="adapter" value="S3" /> + <placeholder name="serviceEndpoint" value="^invalid-endpoint^" /> + <output-dir compare="Text">common/invalid-endpoint</output-dir> + <expected-error>External source error. Invalid service endpoint ^invalid-endpoint^</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/bucket-does-not-exist"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/bucket-does-not-exist</output-dir> + <expected-error>External source error. The specified bucket does not exist (Service: S3, Status Code: 404, Request ID: null)</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset" check-warnings="true"> + <compilation-unit name="common/no-files-returned/definition-points-to-nothing"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/no-files-returned/definition-points-to-nothing</output-dir> + <source-location>false</source-location> + <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn> + <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset" check-warnings="true"> + <compilation-unit name="common/no-files-returned/exclude-all-files"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/no-files-returned/exclude-all-files</output-dir> + <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset" check-warnings="true"> + <compilation-unit name="common/no-files-returned/include-no-files"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/no-files-returned/include-no-files</output-dir> + <expected-warn>The provided external dataset configuration returned no files from the external source</expected-warn> + </compilation-unit> + </test-case> ++ <test-case FilePath="external-dataset/s3"> ++ <compilation-unit name="non-s3-region"> ++ <output-dir compare="Text">non-s3-region</output-dir> ++ </compilation-unit> ++ </test-case> + <test-case FilePath="external-dataset/common"> + <compilation-unit name="query-with-limit-plan"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">query-with-limit-plan</output-dir> + </compilation-unit> + </test-case> + </test-group> + <test-group name="s3-include-exclude"> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/bad-name-1"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/bad-name-1</output-dir> + <expected-error>Invalid format for property "exclude1"</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/bad-name-2"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/bad-name-2</output-dir> + <expected-error>Invalid format for property "exclude#"</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/bad-name-3"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/bad-name-3</output-dir> + <expected-error>Invalid format for property "exclude#hello"</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/both"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/both</output-dir> + <expected-error>The parameters "include" and "exclude" cannot be provided at the same time</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/exclude-all"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/exclude-all</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/exclude-1"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/exclude-1</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/exclude-2"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/exclude-2</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/exclude-3"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/exclude-3</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/exclude-4"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/exclude-4</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/exclude-5"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/exclude-5</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/exclude-6"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/exclude-6</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-all"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-all</output-dir> + <expected-error>Malformed input stream</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-1"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-1</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-2"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-2</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-3"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-3</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-4"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-4</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-5"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-5</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-6"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-6</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-7"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-7</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-8"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-8</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-9"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-9</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-10"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-10</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-11"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-11</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/include-exclude/include-12"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/include-exclude/include-12</output-dir> + </compilation-unit> + </test-case> + </test-group> +</test-suite> diff --cc asterixdb/asterix-external-data/pom.xml index a3daba8,169bcb6..9676da6 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@@ -437,11 -436,11 +437,15 @@@ <artifactId>netty-all</artifactId> </dependency> <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> <groupId>software.amazon.awssdk</groupId> + <artifactId>aws-core</artifactId> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> <artifactId>http-client-spi</artifactId> </dependency> <dependency> diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java index 08f8dec,0bc4c40..9f0f05c --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java @@@ -43,11 -50,14 +43,14 @@@ import org.apache.hyracks.api.util.Clea import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; + import software.amazon.awssdk.services.s3.model.ListObjectsRequest; + import software.amazon.awssdk.services.s3.model.ListObjectsResponse; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; + import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.S3Object; -public class AwsS3InputStreamFactory implements IInputStreamFactory { +public class AwsS3InputStreamFactory extends AbstractExternalInputStreamFactory { private static final long serialVersionUID = 1L; @@@ -62,44 -91,58 +65,28 @@@ this.configuration = configuration; ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext(); - String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME); - - List<S3Object> filesOnly = new ArrayList<>(); - // Ensure the validity of include/exclude - ExternalDataUtils.AwsS3.validateIncludeExclude(configuration); + ExternalDataUtils.validateIncludeExclude(configuration); ++ IncludeExcludeMatcher includeExcludeMatcher = getIncludeExcludeMatchers(); - // Get and compile the patterns for include/exclude if provided - List<Matcher> includeMatchers = new ArrayList<>(); - List<Matcher> excludeMatchers = new ArrayList<>(); - String pattern = null; - try { - for (Map.Entry<String, String> entry : configuration.entrySet()) { - if (entry.getKey().startsWith(KEY_INCLUDE)) { - pattern = entry.getValue(); - includeMatchers.add(Pattern.compile(ExternalDataUtils.patternToRegex(pattern)).matcher("")); - } else if (entry.getKey().startsWith(KEY_EXCLUDE)) { - pattern = entry.getValue(); - excludeMatchers.add(Pattern.compile(ExternalDataUtils.patternToRegex(pattern)).matcher("")); - } - } - } catch (PatternSyntaxException ex) { - throw new CompilationException(ErrorCode.INVALID_REGEX_PATTERN, pattern); - } - - List<Matcher> matchersList; - BiPredicate<List<Matcher>, String> p; - if (!includeMatchers.isEmpty()) { - matchersList = includeMatchers; - p = (matchers, key) -> ExternalDataUtils.matchPatterns(matchers, key); - } else if (!excludeMatchers.isEmpty()) { - matchersList = excludeMatchers; - p = (matchers, key) -> !ExternalDataUtils.matchPatterns(matchers, key); - } else { - matchersList = Collections.emptyList(); - p = (matchers, key) -> true; - } - - // Get all objects in a bucket and extract the paths to files ++ // Prepare to retrieve the objects + List<S3Object> filesOnly; + String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME); S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration); - // Get all objects in a bucket and extract the paths to files - ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container); - listObjectsBuilder.prefix(ExternalDataUtils.getPrefix(configuration)); - - ListObjectsV2Response listObjectsResponse; - boolean done = false; - String newMarker = null; - try { - while (!done) { - // List the objects from the start, or from the last marker in case of truncated result - if (newMarker == null) { - listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build()); - filesOnly = listS3Objects(s3Client, container, matchersList, p); ++ filesOnly = listS3Objects(s3Client, container, includeExcludeMatcher); + } catch (S3Exception ex) { + // New API is not implemented, try falling back to old API + try { + // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html + if (ex.awsErrorDetails().errorCode().equals("NotImplemented")) { - filesOnly = oldApiListS3Objects(s3Client, container, matchersList, p); ++ filesOnly = oldApiListS3Objects(s3Client, container, includeExcludeMatcher); } else { - listObjectsResponse = - s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build()); - } - - // Collect the paths to files only - IncludeExcludeMatcher includeExcludeMatcher = getIncludeExcludeMatchers(); - collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(), - includeExcludeMatcher.getMatchersList(), filesOnly); - - // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request - if (!listObjectsResponse.isTruncated()) { - done = true; - } else { - newMarker = listObjectsResponse.nextContinuationToken(); + throw ex; } + } catch (SdkException ex2) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2.getMessage()); } } catch (SdkException ex) { throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); @@@ -124,6 -167,84 +111,84 @@@ } /** + * Uses the latest API to retrieve the objects from the storage. + * + * @param s3Client S3 client + * @param container container name - * @param matchersList include/exclude matchers to apply - * @param predicate predicate to use for comparison ++ * @param includeExcludeMatcher include/exclude matchers to apply + */ - private List<S3Object> listS3Objects(S3Client s3Client, String container, List<Matcher> matchersList, - BiPredicate<List<Matcher>, String> predicate) { ++ private List<S3Object> listS3Objects(S3Client s3Client, String container, ++ IncludeExcludeMatcher includeExcludeMatcher) { + String newMarker = null; + List<S3Object> filesOnly = new ArrayList<>(); + + ListObjectsV2Response listObjectsResponse; + ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container); + listObjectsBuilder.prefix(ExternalDataUtils.getPrefix(configuration)); + + while (true) { + // List the objects from the start, or from the last marker in case of truncated result + if (newMarker == null) { + listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build()); + } else { + listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build()); + } + + // Collect the paths to files only - collectAndFilterFiles(listObjectsResponse.contents(), predicate, matchersList, filesOnly); ++ collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(), ++ includeExcludeMatcher.getMatchersList(), filesOnly); + + // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request + if (!listObjectsResponse.isTruncated()) { + break; + } else { + newMarker = listObjectsResponse.nextContinuationToken(); + } + } + + return filesOnly; + } + + /** + * Uses the old API (in case the new API is not implemented) to retrieve the objects from the storage + * + * @param s3Client S3 client + * @param container container name - * @param matchersList include/exclude matchers to apply - * @param predicate predicate to use for comparison ++ * @param includeExcludeMatcher include/exclude matchers to apply + */ - private List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, List<Matcher> matchersList, - BiPredicate<List<Matcher>, String> predicate) { ++ private List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, ++ IncludeExcludeMatcher includeExcludeMatcher) { + String newMarker = null; + List<S3Object> filesOnly = new ArrayList<>(); + + ListObjectsResponse listObjectsResponse; + ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container); + listObjectsBuilder.prefix(ExternalDataUtils.getPrefix(configuration)); + + while (true) { + // List the objects from the start, or from the last marker in case of truncated result + if (newMarker == null) { + listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build()); + } else { + listObjectsResponse = s3Client.listObjects(listObjectsBuilder.marker(newMarker).build()); + } + + // Collect the paths to files only - collectAndFilterFiles(listObjectsResponse.contents(), predicate, matchersList, filesOnly); ++ collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(), ++ includeExcludeMatcher.getMatchersList(), filesOnly); + + // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request + if (!listObjectsResponse.isTruncated()) { + break; + } else { + newMarker = listObjectsResponse.nextMarker(); + } + } + + return filesOnly; + } + + /** * AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered * a file if it does not end up with a "/" which is the separator in a folder structure. * diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 6be694b,53306c5..1e3ab45 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@@ -293,32 -288,7 +295,31 @@@ public class ExternalDataConstants public static final String REGION_FIELD_NAME = "region"; public static final String ACCESS_KEY_ID_FIELD_NAME = "accessKeyId"; public static final String SECRET_ACCESS_KEY_FIELD_NAME = "secretAccessKey"; + public static final String SESSION_TOKEN_FIELD_NAME = "sessionToken"; public static final String CONTAINER_NAME_FIELD_NAME = "container"; - public static final String DEFINITION_FIELD_NAME = "definition"; public static final String SERVICE_END_POINT_FIELD_NAME = "serviceEndpoint"; } + + public static class AzureBlob { + private AzureBlob() { + throw new AssertionError("do not instantiate"); + } + + public static final String CONTAINER_NAME_FIELD_NAME = "container"; + public static final String DEFINITION_FIELD_NAME = "definition"; + public static final String CONNECTION_STRING_FIELD_NAME = "connectionString"; + public static final String ACCOUNT_NAME_FIELD_NAME = "accountName"; + public static final String ACCOUNT_KEY_FIELD_NAME = "accountKey"; + public static final String SHARED_ACCESS_SIGNATURE_FIELD_NAME = "sharedAccessSignature"; + public static final String BLOB_ENDPOINT_FIELD_NAME = "blobEndpoint"; + public static final String ENDPOINT_SUFFIX_FIELD_NAME = "endpointSuffix"; + + // Connection string requires PascalCase (MyFieldFormat) + public static final String CONNECTION_STRING_CONNECTION_STRING = "ConnectionString"; + public static final String CONNECTION_STRING_ACCOUNT_NAME = "AccountName"; + public static final String CONNECTION_STRING_ACCOUNT_KEY = "AccountKey"; + public static final String CONNECTION_STRING_SHARED_ACCESS_SIGNATURE = "SharedAccessSignature"; + public static final String CONNECTION_STRING_BLOB_ENDPOINT = "BlobEndpoint"; + public static final String CONNECTION_STRING_ENDPOINT_SUFFIX = "EndpointSuffix"; + } } diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 537933e,c0a7a4d..ec5798f --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@@ -523,11 -476,8 +526,11 @@@ public class ExternalDataUtils switch (type) { case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3: - ExternalDataUtils.AwsS3.validateProperties(configuration, srcLoc, collector); + AwsS3.validateProperties(configuration, srcLoc, collector); break; + case ExternalDataConstants.KEY_ADAPTER_NAME_AZURE_BLOB: + ExternalDataUtils.Azure.validateProperties(configuration, srcLoc, collector); + break; default: // Nothing needs to be done break; @@@ -726,25 -625,9 +729,15 @@@ S3ClientBuilder builder = S3Client.builder(); // Credentials - AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey); + AwsCredentials credentials; + if (sessionToken != null) { + credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken); + } else { + credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey); + } + builder.credentialsProvider(StaticCredentialsProvider.create(credentials)); - - // Validate the region - List<Region> supportedRegions = S3Client.serviceMetadata().regions(); - Optional<Region> selectedRegion = - supportedRegions.stream().filter(region -> region.id().equalsIgnoreCase(regionId)).findFirst(); - - if (!selectedRegion.isPresent()) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, - String.format("region %s is not supported", regionId)); - } - builder.region(selectedRegion.get()); + builder.region(Region.of(regionId)); // Validate the service endpoint if present if (serviceEndpoint != null) { @@@ -809,82 -692,80 +802,120 @@@ CleanupUtils.close(s3Client, null); } } + + boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty() + : ((ListObjectsV2Response) response).contents().isEmpty(); + if (isEmpty && collector.shouldWarn()) { + Warning warning = + WarningUtil.forAsterix(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); + collector.warn(warning); + } + + // Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to + // ensure coverage, check if the result is successful as well and not only catch exceptions + if (!response.sdkHttpResponse().isSuccessful()) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); + } + } + + /** + * Checks for a single object in the specified bucket to determine if the bucket is empty or not. + * + * @param s3Client s3 client + * @param container the container name + * @param prefix Prefix to be used + * @param useOldApi flag whether to use the old API or not + * + * @return returns the S3 response + */ + private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) { + S3Response response; + if (useOldApi) { + ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder(); + listObjectsBuilder.prefix(prefix); + response = s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build()); + } else { + ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder(); + listObjectsBuilder.prefix(prefix); + response = s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build()); + } + return response; } + } + + public static class Azure { + private Azure() { + throw new AssertionError("do not instantiate"); + } /** - * @param configuration - * @throws CompilationException + * Builds the Azure storage account using the provided configuration + * + * @param configuration properties + * @return client */ - public static void validateIncludeExclude(Map<String, String> configuration) throws CompilationException { - // Ensure that include and exclude are not provided at the same time + ensure valid format or property - List<Map.Entry<String, String>> includes = new ArrayList<>(); - List<Map.Entry<String, String>> excludes = new ArrayList<>(); + public static BlobServiceClient buildAzureClient(Map<String, String> configuration) + throws CompilationException { + // TODO(Hussain): Need to ensure that all required parameters are present in a previous step + String connectionString = configuration.get(CONNECTION_STRING_FIELD_NAME); + String accountName = configuration.get(ACCOUNT_NAME_FIELD_NAME); + String accountKey = configuration.get(ACCOUNT_KEY_FIELD_NAME); + String sharedAccessSignature = configuration.get(SHARED_ACCESS_SIGNATURE_FIELD_NAME); + String blobEndpoint = configuration.get(BLOB_ENDPOINT_FIELD_NAME); + String endpointSuffix = configuration.get(ENDPOINT_SUFFIX_FIELD_NAME); + + // Constructor the connection string + // Connection string format: name1=value1;name2=value2;.... + StringBuilder connectionStringBuilder = new StringBuilder(); + BlobServiceClientBuilder builder = new BlobServiceClientBuilder(); + + boolean authMethodFound = false; + + if (connectionString != null) { + // connection string + authMethodFound = true; + connectionStringBuilder.append(connectionString).append(";"); + } + + if (accountName != null && accountKey != null) { + if (authMethodFound) { + throw new CompilationException(ErrorCode.ONLY_SINGLE_AUTHENTICATION_IS_ALLOWED); + } + authMethodFound = true; + // account name + account key + connectionStringBuilder.append(CONNECTION_STRING_ACCOUNT_NAME).append("=").append(accountName) + .append(";").append(CONNECTION_STRING_ACCOUNT_KEY).append("=").append(accountKey).append(";"); + } - // Accepted formats are include, include#1, include#2, ... etc, same for excludes - for (Map.Entry<String, String> entry : configuration.entrySet()) { - String key = entry.getKey(); + if (accountName != null && sharedAccessSignature != null) { + if (authMethodFound) { + throw new CompilationException(ErrorCode.ONLY_SINGLE_AUTHENTICATION_IS_ALLOWED); + } + authMethodFound = true; + // account name + shared access token + connectionStringBuilder.append(CONNECTION_STRING_ACCOUNT_NAME).append("=").append(accountName) + .append(";").append(CONNECTION_STRING_SHARED_ACCESS_SIGNATURE).append("=") + .append(sharedAccessSignature).append(";"); + } - if (key.equals(ExternalDataConstants.KEY_INCLUDE)) { - includes.add(entry); - } else if (key.equals(ExternalDataConstants.KEY_EXCLUDE)) { - excludes.add(entry); - } else if (key.startsWith(ExternalDataConstants.KEY_INCLUDE) - || key.startsWith(ExternalDataConstants.KEY_EXCLUDE)) { - - // Split by the "#", length should be 2, left should be include/exclude, right should be integer - String[] splits = key.split("#"); - - if (key.startsWith(ExternalDataConstants.KEY_INCLUDE) && splits.length == 2 - && splits[0].equals(ExternalDataConstants.KEY_INCLUDE) - && NumberUtils.isIntegerNumericString(splits[1])) { - includes.add(entry); - } else if (key.startsWith(ExternalDataConstants.KEY_EXCLUDE) && splits.length == 2 - && splits[0].equals(ExternalDataConstants.KEY_EXCLUDE) - && NumberUtils.isIntegerNumericString(splits[1])) { - excludes.add(entry); + if (!authMethodFound) { + throw new CompilationException(ErrorCode.NO_AUTH_METHOD_PROVIDED); + } + + // Add blobEndpoint and endpointSuffix if present, adjust any '/' as needed + if (blobEndpoint != null) { + connectionStringBuilder.append(CONNECTION_STRING_BLOB_ENDPOINT).append("=").append(blobEndpoint) + .append(";"); + if (endpointSuffix != null) { + String endpointSuffixUpdated; + if (blobEndpoint.endsWith("/")) { + endpointSuffixUpdated = + endpointSuffix.startsWith("/") ? endpointSuffix.substring(1) : endpointSuffix; } else { - throw new CompilationException(ErrorCode.INVALID_PROPERTY_FORMAT, key); + endpointSuffixUpdated = endpointSuffix.startsWith("/") ? endpointSuffix : "/" + endpointSuffix; } + connectionStringBuilder.append(CONNECTION_STRING_ENDPOINT_SUFFIX).append("=") + .append(endpointSuffixUpdated).append(";"); } }
