This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 93405ea67635cb928146569767d7fde7bd523970 Author: Hussain Towaileb <[email protected]> AuthorDate: Wed May 27 20:19:04 2020 +0300 [ASTERIXDB-2722][EXT] Require minimum permissions + improve error reporting - user model changes: no - storage format changes: no - interface changes: no Details: - Creating an external dataset requires S3:ListBucket permission. - Querying an external dataset requires S3:ListBucket and S3:GetObject permissions. - Improved error reporting. - Added test cases for the above mentioned items (where applicable). Change-Id: Idc266cf63b8f92a07af7341118d2636673913160 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6463 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Dmitry Lychagin <[email protected]> --- asterixdb/asterix-app/pom.xml | 4 - .../asterix/app/translator/QueryTranslator.java | 22 +++- .../s3/bucket-does-not-exist/test.000.ddl.sqlpp | 37 +++++++ .../s3/bucket-does-not-exist/test.099.ddl.sqlpp | 20 ++++ .../definition-does-not-exist/test.000.ddl.sqlpp | 37 +++++++ .../definition-does-not-exist/test.001.query.sqlpp | 22 ++++ .../definition-does-not-exist/test.099.ddl.sqlpp | 20 ++++ .../aws/s3/invalid-endpoint/test.000.ddl.sqlpp | 37 +++++++ .../aws/s3/invalid-endpoint/test.099.ddl.sqlpp | 20 ++++ .../s3/definition-does-not-exist/result.001.adm | 1 + .../runtimets/testsuite_external_dataset.xml | 17 ++++ .../asterix/common/exceptions/ErrorCode.java | 2 + .../src/main/resources/asx_errormsg/en.properties | 2 + asterixdb/asterix-external-data/pom.xml | 8 ++ .../input/record/reader/aws/AwsS3InputStream.java | 60 ++++------- .../record/reader/aws/AwsS3InputStreamFactory.java | 108 ++++++++------------ .../external/util/ExternalDataConstants.java | 7 +- .../asterix/external/util/ExternalDataUtils.java | 112 +++++++++++++++++++++ asterixdb/pom.xml | 5 + 19 files changed, 424 insertions(+), 117 deletions(-) diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index 219595b..e3702a6 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -703,22 +703,18 @@ <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>sdk-core</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>s3</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>regions</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>auth</artifactId> - <scope>test</scope> </dependency> <!-- Mock for AWS S3 --> <dependency> diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 0f540ff..ed3eb8b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.app.translator; -import static org.apache.asterix.common.exceptions.ErrorCode.UNKNOWN_DATAVERSE; - import java.io.File; import java.io.FileInputStream; import java.io.InputStream; @@ -637,7 +635,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new CompilationException(ErrorCode.UNKNOWN_DATAVERSE, sourceLoc, dataverseName); } - IDatasetDetails datasetDetails = null; + IDatasetDetails datasetDetails; Dataset ds = metadataProvider.findDataset(dataverseName, datasetName); if (ds != null) { if (dd.getIfNotExists()) { @@ -738,7 +736,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen createExternalDatasetProperties(dataverseName, dd, metadataProvider, mdTxnCtx); ExternalDataUtils.normalize(properties); ExternalDataUtils.validate(properties); - validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation()); + validateExternalDatasetProperties(externalDetails, properties, dd.getSourceLocation(), mdTxnCtx); datasetDetails = new ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(), TransactionState.COMMIT); break; @@ -3236,12 +3234,26 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } protected void validateExternalDatasetProperties(ExternalDetailsDecl externalDetails, - Map<String, String> properties, SourceLocation srcLoc) throws CompilationException { + Map<String, String> properties, SourceLocation srcLoc, MetadataTransactionContext mdTxnCtx) + throws AlgebricksException, HyracksDataException { String adapter = externalDetails.getAdapter(); // "format" parameter is needed for "S3" data source if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(adapter) && properties.get(ExternalDataConstants.KEY_FORMAT) == null) { throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); } + + Map<String, String> details = new HashMap<>(properties); + details.put(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, adapter); + validateExternalSourceContainer(details); + } + + /** + * Ensures that the external source container is present + * + * @param configuration external source properties + */ + protected void validateExternalSourceContainer(Map<String, String> configuration) throws CompilationException { + ExternalDataUtils.validateExternalSourceContainer(configuration); } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.000.ddl.sqlpp new file mode 100644 index 0000000..b174162 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.000.ddl.sqlpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; +create dataverse test; +use test; + +drop type test if exists; +create type test as open { +}; + +drop dataset test if exists; +create external dataset test(test) using S3 ( +("accessKeyId"="dummyAccessKey"), +("secretAccessKey"="dummySecretKey"), +("region"="us-west-2"), +("serviceEndpoint"="http://localhost:8001"), +("container"="bucket-does-not-exist"), +("definition"="over-1000-objects"), +("format"="json") +); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.099.ddl.sqlpp new file mode 100644 index 0000000..548e632 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/bucket-does-not-exist/test.099.ddl.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.000.ddl.sqlpp new file mode 100644 index 0000000..7a86e69 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.000.ddl.sqlpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; +create dataverse test; +use test; + +drop type test if exists; +create type test as open { +}; + +drop dataset test if exists; +create external dataset test(test) using S3 ( +("accessKeyId"="dummyAccessKey"), +("secretAccessKey"="dummySecretKey"), +("region"="us-west-2"), +("serviceEndpoint"="http://localhost:8001"), +("container"="playground"), +("definition"="definition-does-not-exist"), +("format"="json") +); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.001.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.001.query.sqlpp new file mode 100644 index 0000000..affdb87 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.001.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use test; + +select count(*) `count` from test; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.099.ddl.sqlpp new file mode 100644 index 0000000..548e632 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/definition-does-not-exist/test.099.ddl.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.000.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.000.ddl.sqlpp new file mode 100644 index 0000000..da2b945 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.000.ddl.sqlpp @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; +create dataverse test; +use test; + +drop type test if exists; +create type test as open { +}; + +drop dataset test if exists; +create external dataset test(test) using S3 ( +("accessKeyId"="dummyAccessKey"), +("secretAccessKey"="dummySecretKey"), +("region"="us-west-2"), +("serviceEndpoint"="^invalid-endpoint^"), +("container"="bucket-does-not-exist"), +("definition"="over-1000-objects"), +("format"="json") +); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.099.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.099.ddl.sqlpp new file mode 100644 index 0000000..548e632 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/aws/s3/invalid-endpoint/test.099.ddl.sqlpp @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +drop dataverse test if exists; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/definition-does-not-exist/result.001.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/definition-does-not-exist/result.001.adm new file mode 100644 index 0000000..c1a0ea2 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/aws/s3/definition-does-not-exist/result.001.adm @@ -0,0 +1 @@ +{ "count": 0 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml index b0346f8..7c1aa48 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset.xml @@ -82,5 +82,22 @@ <expected-error>Parsing error at malformed-data/malformed-jsonl-2.json line 11 field array_f: Unexpected character (']' (code 93)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')</expected-error> </compilation-unit> </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="aws/s3/definition-does-not-exist"> + <output-dir compare="Text">aws/s3/definition-does-not-exist</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="aws/s3/invalid-endpoint"> + <output-dir compare="Text">aws/s3/invalid-endpoint</output-dir> + <expected-error>External source error. Invalid service endpoint ^invalid-endpoint^</expected-error> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="aws/s3/bucket-does-not-exist"> + <output-dir compare="Text">aws/s3/bucket-does-not-exist</output-dir> + <expected-error>External source error. The specified bucket does not exist (Service: S3, Status Code: 404, Request ID: null)</expected-error> + </compilation-unit> + </test-case> </test-group> </test-suite> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 37043b2..6496c94 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -196,6 +196,8 @@ public class ErrorCode { public static final int OPERATION_NOT_SUPPORTED_ON_PRIMARY_INDEX = 1105; public static final int EXPECTED_CONSTANT_VALUE = 1106; public static final int UNEXPECTED_HINT = 1107; + public static final int EXTERNAL_SOURCE_ERROR = 1108; + public static final int EXTERNAL_SOURCE_CONTAINER_NOT_FOUND = 1109; // Feed errors public static final int DATAFLOW_ILLEGAL_STATE = 3001; diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index d3a2215..7e75a51 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -191,6 +191,8 @@ 1105 = Operation not supported on primary index %1$s 1106 = Expected constant value 1107 = Unexpected hint: %1$s. %2$s expected at this location +1108 = External source error. %1$s +1109 = External source container %1$s not found # Feed Errors 3001 = Illegal state. diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index 30e7770..8270d71 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -437,6 +437,14 @@ </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> + <artifactId>http-client-spi</artifactId> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>sdk-core</artifactId> + </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> <artifactId>s3</artifactId> </dependency> <dependency> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java index 448d3f5..bcbf540 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@ -18,23 +18,24 @@ */ package org.apache.asterix.external.input.record.reader.aws; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants; +import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3; import java.io.IOException; -import java.net.URI; import java.util.List; import java.util.Map; import java.util.zip.GZIPInputStream; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.external.input.stream.AbstractMultipleInputStream; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.CleanupUtils; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.model.GetObjectRequest; public class AwsS3InputStream extends AbstractMultipleInputStream { @@ -48,7 +49,7 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { private final List<String> filePaths; private int nextFileIndex = 0; - public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) { + public AwsS3InputStream(Map<String, String> configuration, List<String> filePaths) throws HyracksDataException { this.configuration = configuration; this.filePaths = filePaths; this.s3Client = buildAwsS3Client(configuration); @@ -74,13 +75,17 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { CleanupUtils.close(in, null); } - String bucket = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME); + String bucket = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME); GetObjectRequest.Builder getObjectBuilder = GetObjectRequest.builder(); GetObjectRequest getObjectRequest = getObjectBuilder.bucket(bucket).key(filePaths.get(nextFileIndex)).build(); // Have a reference to the S3 stream to ensure that if GZipInputStream causes an IOException because of reading // the header, then the S3 stream gets closed in the close method - in = s3Client.getObject(getObjectRequest); + try { + in = s3Client.getObject(getObjectRequest); + } catch (SdkException ex) { + throw new RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); + } // Use gzip stream if needed String filename = filePaths.get(nextFileIndex).toLowerCase(); @@ -96,6 +101,14 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { return true; } + private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException { + try { + return ExternalDataUtils.AwsS3.buildAwsS3Client(configuration); + } catch (CompilationException ex) { + throw HyracksDataException.create(ex); + } + } + @Override public boolean stop() { return false; @@ -126,33 +139,4 @@ public class AwsS3InputStream extends AbstractMultipleInputStream { private String getStreamNameAt(int fileIndex) { return fileIndex < 0 || filePaths == null || filePaths.isEmpty() ? "" : filePaths.get(fileIndex); } - - /** - * Prepares and builds the Amazon S3 client with the provided configuration - * - * @param configuration S3 client configuration - * - * @return Amazon S3 client - */ - private static S3Client buildAwsS3Client(Map<String, String> configuration) { - S3ClientBuilder builder = S3Client.builder(); - - // Credentials - String accessKeyId = configuration.get(AwsS3Constants.ACCESS_KEY_ID_FIELD_NAME); - String secretAccessKey = configuration.get(AwsS3Constants.SECRET_ACCESS_KEY_FIELD_NAME); - AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey); - builder.credentialsProvider(StaticCredentialsProvider.create(credentials)); - - // Region - String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME); - builder.region(Region.of(region)); - - // Use user's endpoint if provided - if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) { - String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME); - builder.endpointOverride(URI.create(endPoint)); - } - - return builder.build(); - } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java index 9158a57..e57b31a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java @@ -18,30 +18,30 @@ */ package org.apache.asterix.external.input.record.reader.aws; -import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3Constants; +import static org.apache.asterix.external.util.ExternalDataConstants.AwsS3; import java.io.Serializable; -import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.external.api.AsterixInputStream; import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.CleanupUtils; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; @@ -52,7 +52,8 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory { private Map<String, String> configuration; // Files to read from - private List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>(); + private final List<S3Object> filesOnly = new ArrayList<>(); + private final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>(); private transient AlgebricksAbsolutePartitionConstraint partitionConstraint; @@ -67,7 +68,7 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory { } @Override - public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) { + public AsterixInputStream createInputStream(IHyracksTaskContext ctx, int partition) throws HyracksDataException { return new AwsS3InputStream(configuration, partitionWorkLoadsBasedOnSize.get(partition).getFilePaths()); } @@ -81,51 +82,57 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory { this.configuration = configuration; ICcApplicationContext ccApplicationContext = (ICcApplicationContext) ctx.getApplicationContext(); - String container = configuration.get(AwsS3Constants.CONTAINER_NAME_FIELD_NAME); + String container = configuration.get(AwsS3.CONTAINER_NAME_FIELD_NAME); - S3Client s3Client = buildAwsS3Client(configuration); + S3Client s3Client = ExternalDataUtils.AwsS3.buildAwsS3Client(configuration); // Get all objects in a bucket and extract the paths to files ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container); - String path = configuration.get(AwsS3Constants.DEFINITION_FIELD_NAME); + String path = configuration.get(AwsS3.DEFINITION_FIELD_NAME); if (path != null) { listObjectsBuilder.prefix(path + (!path.isEmpty() && !path.endsWith("/") ? "/" : "")); } ListObjectsV2Response listObjectsResponse; - List<S3Object> s3Objects = new ArrayList<>(); boolean done = false; String newMarker = null; - while (!done) { - // List the objects from the start, or from the last marker in case of truncated result - if (newMarker == null) { - listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build()); - } else { - listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build()); - } - - // Collect all the provided objects - s3Objects.addAll(listObjectsResponse.contents()); + String fileFormat = configuration.get(ExternalDataConstants.KEY_FORMAT); - // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request - if (!listObjectsResponse.isTruncated()) { - done = true; - } else { - newMarker = listObjectsResponse.nextContinuationToken(); + try { + while (!done) { + // List the objects from the start, or from the last marker in case of truncated result + if (newMarker == null) { + listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build()); + } else { + listObjectsResponse = + s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build()); + } + + // Collect the paths to files only + collectFilesOnly(listObjectsResponse.contents(), fileFormat); + + // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request + if (!listObjectsResponse.isTruncated()) { + done = true; + } else { + newMarker = listObjectsResponse.nextContinuationToken(); + } + } + } catch (SdkException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); + } finally { + if (s3Client != null) { + CleanupUtils.close(s3Client, null); } } - // Exclude the directories and get the files only - String fileFormat = configuration.get(ExternalDataConstants.KEY_FORMAT); - List<S3Object> fileObjects = getFilesOnly(s3Objects, fileFormat); - // Partition constraints partitionConstraint = ccApplicationContext.getClusterStateManager().getClusterLocations(); int partitionsCount = partitionConstraint.getLocations().length; // Distribute work load amongst the partitions - distributeWorkLoad(fileObjects, partitionsCount); + distributeWorkLoad(filesOnly, partitionsCount); } /** @@ -133,21 +140,17 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory { * a file if it does not end up with a "/" which is the separator in a folder structure. * * @param s3Objects List of returned objects - * - * @return A list of string paths that point to files only + * @param fileFormat The expected file format * * @throws AsterixException AsterixException */ - private List<S3Object> getFilesOnly(List<S3Object> s3Objects, String fileFormat) throws AsterixException { - List<S3Object> filesOnly = new ArrayList<>(); + private void collectFilesOnly(List<S3Object> s3Objects, String fileFormat) throws AsterixException { String fileExtension = getFileExtension(fileFormat); if (fileExtension == null) { throw AsterixException.create(ErrorCode.PROVIDER_STREAM_RECORD_READER_UNKNOWN_FORMAT, fileFormat); } s3Objects.stream().filter(object -> isValidFile(object.key(), fileFormat)).forEach(filesOnly::add); - - return filesOnly; } /** @@ -214,35 +217,6 @@ public class AwsS3InputStreamFactory implements IInputStreamFactory { } /** - * Prepares and builds the Amazon S3 client with the provided configuration - * - * @param configuration S3 client configuration - * - * @return Amazon S3 client - */ - private static S3Client buildAwsS3Client(Map<String, String> configuration) { - S3ClientBuilder builder = S3Client.builder(); - - // Credentials - String accessKeyId = configuration.get(AwsS3Constants.ACCESS_KEY_ID_FIELD_NAME); - String secretAccessKey = configuration.get(AwsS3Constants.SECRET_ACCESS_KEY_FIELD_NAME); - AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey); - builder.credentialsProvider(StaticCredentialsProvider.create(credentials)); - - // Region - String region = configuration.get(AwsS3Constants.REGION_FIELD_NAME); - builder.region(Region.of(region)); - - // Use user's endpoint if provided - if (configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME) != null) { - String endPoint = configuration.get(AwsS3Constants.SERVICE_END_POINT_FIELD_NAME); - builder.endpointOverride(URI.create(endPoint)); - } - - return builder.build(); - } - - /** * Returns the file extension for the provided file format. * * @param format file format diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 63f57b6..e93c3b9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -82,7 +82,7 @@ public class ExternalDataConstants { public static final String KEY_WAIT_FOR_DATA = "wait-for-data"; public static final String KEY_FEED_NAME = "feed"; // a string representing external bucket name - public static final String KEY_BUCKET = "bucket"; + public static final String KEY_EXTERNAL_SOURCE_TYPE = "type"; // a comma delimited list of nodes public static final String KEY_NODES = "nodes"; // a string representing the password used to authenticate with the external data source @@ -276,8 +276,9 @@ public class ExternalDataConstants { public static final String EMPTY_FIELD = "empty value"; public static final String INVALID_VAL = "invalid value"; - public static class AwsS3Constants { - private AwsS3Constants() { + public static class AwsS3 { + private AwsS3() { + throw new AssertionError("do not instantiate"); } public static final String REGION_FIELD_NAME = "region"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 2ae2838..1501287 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -24,10 +24,15 @@ import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START; +import java.net.URI; +import java.net.URISyntaxException; import java.util.EnumMap; +import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.library.ILibraryManager; @@ -40,6 +45,7 @@ import org.apache.asterix.om.types.ATypeTag; import org.apache.asterix.om.types.AUnionType; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.dataflow.common.data.parsers.BooleanParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.DoubleParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.FloatParserFactory; @@ -48,6 +54,15 @@ import org.apache.hyracks.dataflow.common.data.parsers.IntegerParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.LongParserFactory; import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; + public class ExternalDataUtils { private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class); @@ -436,4 +451,101 @@ public class ExternalDataUtils { configuration.put(key, paramValue.toLowerCase().trim()); } } + + /** + * Ensures that the external source container is present + * + * @param configuration external source properties + */ + public static void validateExternalSourceContainer(Map<String, String> configuration) throws CompilationException { + String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE); + + switch (type) { + case ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3: + ExternalDataUtils.AwsS3.validateExternalSourceContainer(configuration); + break; + default: + // Nothing needs to be done + break; + } + } + + public static class AwsS3 { + private AwsS3() { + throw new AssertionError("do not instantiate"); + } + + public static S3Client buildAwsS3Client(Map<String, String> configuration) throws CompilationException { + // TODO(Hussain): Need to ensure that all required parameters are present in a previous step + String accessKeyId = configuration.get(ExternalDataConstants.AwsS3.ACCESS_KEY_ID_FIELD_NAME); + String secretAccessKey = configuration.get(ExternalDataConstants.AwsS3.SECRET_ACCESS_KEY_FIELD_NAME); + String regionId = configuration.get(ExternalDataConstants.AwsS3.REGION_FIELD_NAME); + String serviceEndpoint = configuration.get(ExternalDataConstants.AwsS3.SERVICE_END_POINT_FIELD_NAME); + + S3ClientBuilder builder = S3Client.builder(); + + // Credentials + AwsBasicCredentials credentials = AwsBasicCredentials.create(accessKeyId, secretAccessKey); + builder.credentialsProvider(StaticCredentialsProvider.create(credentials)); + + // Validate the region + List<Region> supportedRegions = S3Client.serviceMetadata().regions(); + Optional<Region> selectedRegion = + supportedRegions.stream().filter(region -> region.id().equalsIgnoreCase(regionId)).findFirst(); + + if (!selectedRegion.isPresent()) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, + String.format("region %s is not supported", regionId)); + } + builder.region(selectedRegion.get()); + + // Validate the service endpoint if present + if (serviceEndpoint != null) { + try { + URI uri = new URI(serviceEndpoint); + try { + builder.endpointOverride(uri); + } catch (NullPointerException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); + } + } catch (URISyntaxException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, + String.format("Invalid service endpoint %s", serviceEndpoint)); + } + } + + return builder.build(); + } + + /** + * Validates if the container being used is available or not. + * + * @param configuration external datasource configuration + * + * @throws CompilationException Compilation exception + */ + public static void validateExternalSourceContainer(Map<String, String> configuration) + throws CompilationException { + S3Client s3Client = null; + + try { + String container = configuration.get(ExternalDataConstants.AwsS3.CONTAINER_NAME_FIELD_NAME); + s3Client = buildAwsS3Client(configuration); + ListObjectsV2Response response = + s3Client.listObjectsV2(ListObjectsV2Request.builder().bucket(container).maxKeys(1).build()); + + // Returns 200 only in case the bucket exists, however, otherwise, throws an exception. However, to + // ensure coverage, check if the result is successful as well and not only catch exceptions + if (!response.sdkHttpResponse().isSuccessful()) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); + } + } catch (SdkException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex.getMessage()); + } finally { + if (s3Client != null) { + CleanupUtils.close(s3Client, null); + } + } + } + } } diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml index cd5e060..9daf70f 100644 --- a/asterixdb/pom.xml +++ b/asterixdb/pom.xml @@ -1432,6 +1432,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>http-client-spi</artifactId> + <version>${awsjavasdk.version}</version> + </dependency> <!-- Mock for AWS S3 --> <dependency> <groupId>io.findify</groupId>
