This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit dec9f981e425ff749d74df9e08120c71c867a7ba Merge: d21a38bf55 7026faf391 Author: Michael Blow <[email protected]> AuthorDate: Mon Nov 25 07:51:43 2024 -0500 Merge branch 'gerrit/goldfish' into 'master' Change-Id: Ia6f1478cc62fb07c2eba3c6c9828b568c43a3cc0 .../rules/PushAggregateIntoNestedSubplanRule.java | 6 +- .../optimizer/rules/PushGroupByThroughProduct.java | 4 +- .../external_dataset/ExternalDatasetTestUtils.java | 4 + .../deltalake/DeltaAllTypeGenerator.java | 268 +++++++++++++++++++++ .../deltalake-all-type.00.ddl.sqlpp | 52 ++-- .../deltalake-all-type.01.query.sqlpp | 30 +-- .../deltalake-all-type.02.query.sqlpp | 30 +-- .../deltalake-field-access-pushdown.00.ddl.sqlpp | 53 ++-- .../deltalake-field-access-pushdown.01.query.sqlpp | 30 +-- .../deltalake-field-access-pushdown.02.query.sqlpp | 33 +-- .../deltalake-field-access-pushdown.03.query.sqlpp | 32 +-- .../deltalake-field-access-pushdown.04.query.sqlpp | 33 +-- .../deltalake-field-access-pushdown.05.query.sqlpp | 32 +-- .../deltalake-field-access-pushdown.06.query.sqlpp | 34 +-- .../deltalake-field-access-pushdown.07.query.sqlpp | 33 +-- .../deltalake-field-access-pushdown.08.query.sqlpp | 34 +-- .../deltalake-field-access-pushdown.09.query.sqlpp | 33 +-- .../deltalake-field-access-pushdown.10.query.sqlpp | 34 +-- .../deltalake-table-not-exists.00.ddl.sqlpp | 37 +-- .../deltalake-all-type/deltalake-all-type.01.adm | 5 + .../deltalake-all-type/deltalake-all-type.02.adm | 5 + .../deltalake-field-access-pushdown.01.adm | 5 + .../deltalake-field-access-pushdown.02.plan | 22 ++ .../deltalake-field-access-pushdown.03.adm | 5 + .../deltalake-field-access-pushdown.04.plan | 22 ++ .../deltalake-field-access-pushdown.05.adm | 5 + .../deltalake-field-access-pushdown.06.plan | 50 ++++ .../deltalake-field-access-pushdown.07.adm | 5 + .../deltalake-field-access-pushdown.08.plan | 50 ++++ .../deltalake-field-access-pushdown.09.adm | 5 + .../deltalake-field-access-pushdown.10.adm | 46 ++++ .../runtimets/testsuite_external_dataset_s3.xml | 19 ++ .../NoOpFrameOperationCallbackFactory.java | 5 + .../aws/delta/AsterixDeltaRuntimeException.java | 34 +-- .../aws/delta/AsterixTypeToDeltaTypeVisitor.java | 145 +++++++++++ .../reader/aws/delta/AwsS3DeltaReaderFactory.java | 150 +++++++++--- .../reader/aws/delta/DeltaFileRecordReader.java | 10 +- .../aws/delta/converter/DeltaConverterContext.java | 10 +- .../asterix/external/parser/DeltaDataParser.java | 13 +- .../asterix/external/util/ExternalDataUtils.java | 34 ++- .../asterix/external/util/aws/s3/S3AuthUtils.java | 4 + .../external/util/google/gcs/GCSConstants.java | 12 + .../asterix/external/util/google/gcs/GCSUtils.java | 35 ++- .../LSMPrimaryInsertOperatorNodePushable.java | 10 +- .../LSMPrimaryUpsertOperatorNodePushable.java | 16 +- .../runtime/operators/StandardBatchController.java | 45 ++++ .../algebra/util/OperatorManipulationUtil.java | 19 +- .../rewriter/rules/AbstractDecorrelationRule.java | 4 +- .../subplan/IntroduceGroupByForSubplanRule.java | 7 +- .../apache/hyracks/api/comm/IFrameAppender.java | 2 +- .../hyracks/api/util/HyracksThrowingAction.java} | 30 +-- .../org/apache/hyracks/api/util/InvokeUtil.java | 36 +++ .../common/io/MessagingFrameTupleAppender.java | 9 - .../hyracks/dataflow/common/utils/TaskUtil.java | 14 ++ ...perationCallback.java => IBatchController.java} | 29 +-- .../am/lsm/common/api/IFrameOperationCallback.java | 14 +- .../storage/am/lsm/common/api/ILSMHarness.java | 9 +- .../storage/am/lsm/common/impls/LSMHarness.java | 31 ++- .../am/lsm/common/impls/LSMTreeIndexAccessor.java | 6 +- 59 files changed, 1232 insertions(+), 557 deletions(-) diff --cc asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java index 2ba1844a86,bf0938b961..f36d25dd5f --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java @@@ -468,5 -346,241 +469,8 @@@ public class S3AuthUtils if (!response.sdkHttpResponse().isSuccessful()) { throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); } + if (isDeltaTable(configuration)) { + validateDeltaTableExists(configuration); + } } - - /** - * Checks for a single object in the specified bucket to determine if the bucket is empty or not. - * - * @param s3Client s3 client - * @param container the container name - * @param prefix Prefix to be used - * @param useOldApi flag whether to use the old API or not - * @return returns the S3 response - */ - private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) { - S3Response response; - if (useOldApi) { - ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder(); - listObjectsBuilder.prefix(prefix); - response = s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build()); - } else { - ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder(); - listObjectsBuilder.prefix(prefix); - response = s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build()); - } - return response; - } - - /** - * Returns the lists of S3 objects. - * - * @param configuration properties - * @param includeExcludeMatcher include/exclude matchers to apply - */ - public static List<S3Object> listS3Objects(Map<String, String> configuration, - AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher, - IWarningCollector warningCollector, ExternalDataPrefix externalDataPrefix, - IExternalFilterEvaluator evaluator) throws CompilationException, HyracksDataException { - // Prepare to retrieve the objects - List<S3Object> filesOnly; - String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - S3Client s3Client = buildAwsS3Client(configuration); - String prefix = getPrefix(configuration); - - try { - filesOnly = listS3Objects(s3Client, container, prefix, includeExcludeMatcher, externalDataPrefix, evaluator, - warningCollector); - } catch (S3Exception ex) { - // New API is not implemented, try falling back to old API - try { - // For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html - if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) { - filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher, - externalDataPrefix, evaluator, warningCollector); - } else { - throw ex; - } - } catch (SdkException ex2) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex2, getMessageOrToString(ex)); - } - } catch (SdkException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); - } finally { - if (s3Client != null) { - CleanupUtils.close(s3Client, null); - } - } - - // Warn if no files are returned - if (filesOnly.isEmpty() && warningCollector.shouldWarn()) { - Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES); - warningCollector.warn(warning); - } - - return filesOnly; - } - - /** - * Uses the latest API to retrieve the objects from the storage. - * - * @param s3Client S3 client - * @param container container name - * @param prefix definition prefix - * @param includeExcludeMatcher include/exclude matchers to apply - */ - private static List<S3Object> listS3Objects(S3Client s3Client, String container, String prefix, - AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher, - ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator, - IWarningCollector warningCollector) throws HyracksDataException { - String newMarker = null; - List<S3Object> filesOnly = new ArrayList<>(); - - ListObjectsV2Response listObjectsResponse; - ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container); - listObjectsBuilder.prefix(prefix); - - while (true) { - // List the objects from the start, or from the last marker in case of truncated result - if (newMarker == null) { - listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build()); - } else { - listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build()); - } - - // Collect the paths to files only - collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(), - includeExcludeMatcher.getMatchersList(), filesOnly, externalDataPrefix, evaluator, - warningCollector); - - // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request - if (listObjectsResponse.isTruncated() != null && listObjectsResponse.isTruncated()) { - newMarker = listObjectsResponse.nextContinuationToken(); - } else { - break; - } - } - - return filesOnly; - } - - /** - * Uses the old API (in case the new API is not implemented) to retrieve the objects from the storage - * - * @param s3Client S3 client - * @param container container name - * @param prefix definition prefix - * @param includeExcludeMatcher include/exclude matchers to apply - */ - private static List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, String prefix, - AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher, - ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator evaluator, - IWarningCollector warningCollector) throws HyracksDataException { - String newMarker = null; - List<S3Object> filesOnly = new ArrayList<>(); - - ListObjectsResponse listObjectsResponse; - ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container); - listObjectsBuilder.prefix(prefix); - - while (true) { - // List the objects from the start, or from the last marker in case of truncated result - if (newMarker == null) { - listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build()); - } else { - listObjectsResponse = s3Client.listObjects(listObjectsBuilder.marker(newMarker).build()); - } - - // Collect the paths to files only - collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(), - includeExcludeMatcher.getMatchersList(), filesOnly, externalDataPrefix, evaluator, - warningCollector); - - // Mark the flag as done if done, otherwise, get the marker of the previous response for the next request - if (listObjectsResponse.isTruncated() != null && listObjectsResponse.isTruncated()) { - newMarker = listObjectsResponse.nextMarker(); - } else { - break; - } - } - - return filesOnly; - } - - /** - * Collects only files that pass all tests - * - * @param s3Objects s3 objects - * @param predicate predicate - * @param matchers matchers - * @param filesOnly filtered files - * @param externalDataPrefix external data prefix - * @param evaluator evaluator - */ - private static void collectAndFilterFiles(List<S3Object> s3Objects, BiPredicate<List<Matcher>, String> predicate, - List<Matcher> matchers, List<S3Object> filesOnly, ExternalDataPrefix externalDataPrefix, - IExternalFilterEvaluator evaluator, IWarningCollector warningCollector) throws HyracksDataException { - for (S3Object object : s3Objects) { - if (ExternalDataUtils.evaluate(object.key(), predicate, matchers, externalDataPrefix, evaluator, - warningCollector)) { - filesOnly.add(object); - } - } - } - - public static Map<String, List<String>> S3ObjectsOfSingleDepth(Map<String, String> configuration, String container, - String prefix) throws CompilationException, HyracksDataException { - // create s3 client - S3Client s3Client = buildAwsS3Client(configuration); - // fetch all the s3 objects - return listS3ObjectsOfSingleDepth(s3Client, container, prefix); - } - - /** - * Uses the latest API to retrieve the objects from the storage of a single level. - * - * @param s3Client S3 client - * @param container container name - * @param prefix definition prefix - */ - private static Map<String, List<String>> listS3ObjectsOfSingleDepth(S3Client s3Client, String container, - String prefix) throws HyracksDataException { - Map<String, List<String>> allObjects = new HashMap<>(); - ListObjectsV2Iterable listObjectsInterable; - ListObjectsV2Request.Builder listObjectsBuilder = - ListObjectsV2Request.builder().bucket(container).prefix(prefix).delimiter("/"); - listObjectsBuilder.prefix(prefix); - List<String> files = new ArrayList<>(); - List<String> folders = new ArrayList<>(); - // to skip the prefix as a file from the response - boolean checkPrefixInFile = true; - listObjectsInterable = s3Client.listObjectsV2Paginator(listObjectsBuilder.build()); - for (ListObjectsV2Response response : listObjectsInterable) { - // put all the files - for (S3Object object : response.contents()) { - String fileName = object.key(); - fileName = fileName.substring(prefix.length(), fileName.length()); - if (checkPrefixInFile) { - if (prefix.equals(object.key())) - checkPrefixInFile = false; - else { - files.add(fileName); - } - } else { - files.add(fileName); - } - } - // put all the folders - for (CommonPrefix object : response.commonPrefixes()) { - String folderName = object.prefix(); - folderName = folderName.substring(prefix.length(), folderName.length()); - folders.add(folderName.endsWith("/") ? folderName.substring(0, folderName.length() - 1) : folderName); - } - } - allObjects.put("files", files); - allObjects.put("folders", folders); - return allObjects; - } }
