This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 5ddcc8f276811f338053249e3ec1b2d4f90ffb4d Author: ayush.tripathi <[email protected]> AuthorDate: Thu Nov 21 03:20:11 2024 +0530 [ASTERIXDB-3503][EXT] Fixing Internal Error issue when Delta table does not exists. - user model changes: yes - storage format changes: no - interface changes: no Ext-ref: MB-64314 Change-Id: I8d403c8c0698d9d39dc8988eb8b57588a684dbed Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19098 Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Murtadha Hubail <[email protected]> Integration-Tests: Murtadha Hubail <[email protected]> --- .../deltalake-table-not-exists.00.ddl.sqlpp | 35 +++++++++++++++++ .../runtimets/testsuite_external_dataset_s3.xml | 7 ++++ .../reader/aws/delta/AwsS3DeltaReaderFactory.java | 44 ++++++++++++++-------- .../reader/aws/delta/DeltaFileRecordReader.java | 10 +++-- .../asterix/external/util/ExternalDataUtils.java | 30 +++++++++++++++ .../asterix/external/util/aws/s3/S3Utils.java | 5 ++- 6 files changed, 112 insertions(+), 19 deletions(-) diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp new file mode 100644 index 0000000000..c57de93232 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + DROP DATAVERSE test IF EXISTS; + CREATE DATAVERSE test; + + USE test; + + + CREATE TYPE DeltalakeTableType as { + }; + + CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter% + ( + %template%, + ("container"="playground"), + ("definition"="delta-data/s1"), + ("table-format" = "delta") + ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index d56c1a46ec..d840527a27 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -339,6 +339,13 @@ <expected-error>Supported file format for 'delta' tables is 'parquet', but 'avro' was provided.</expected-error> </compilation-unit> </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/deltalake-table-not-exists"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">none</output-dir> + <expected-error>ASX1108: External source error. io.delta.kernel.exceptions.TableNotFoundException: Delta table at path `s3a://playground/delta-data/s1` is not found.</expected-error> + </compilation-unit> + </test-case> <test-case FilePath="external-dataset"> <compilation-unit name="common/avro/avro-types/avro-map"> <placeholder name="adapter" value="S3" /> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java index 9d93fcea1c..8dc820becb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java @@ -19,6 +19,7 @@ package org.apache.asterix.external.input.record.reader.aws.delta; import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.IOException; import java.io.Serializable; @@ -33,6 +34,8 @@ import java.util.Set; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.external.api.IExternalDataRuntimeContext; import org.apache.asterix.external.api.IRecordReader; @@ -60,6 +63,7 @@ import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.internal.InternalScanFileUtils; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; @@ -87,20 +91,7 @@ public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { throws AlgebricksException, HyracksDataException { this.configuration = configuration; Configuration conf = new Configuration(); - conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME)); - conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME)); - if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) { - conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME)); - } - conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME)); - String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); - if (serviceEndpoint != null) { - conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint); - } - conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS, - configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, "")); - conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, - configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, "")); + configurationBuilder(configuration, conf); String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://" + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); @@ -109,7 +100,13 @@ public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { Engine engine = DefaultEngine.create(conf); io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); - Snapshot snapshot = table.getLatestSnapshot(engine); + Snapshot snapshot; + try { + snapshot = table.getLatestSnapshot(engine); + } catch (KernelException e) { + LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e); + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); + } List<Warning> warnings = new ArrayList<>(); DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings); @@ -192,6 +189,23 @@ public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { partitionWorkLoadsBasedOnSize.addAll(workloadQueue); } + public static void configurationBuilder(Map<String, String> configuration, Configuration conf) { + conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME)); + conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME)); + if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) { + conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME)); + } + conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME)); + String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); + if (serviceEndpoint != null) { + conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint); + } + conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS, + configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, "")); + conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, + configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, "")); + } + @Override public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException { try { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java index 558f8a96fe..a5b21b6cad 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java @@ -133,9 +133,13 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { scanFile = scanFiles.get(fileIndex); fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile); physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState); - physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus), - physicalReadSchema, Optional.empty()); - dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter); + try { + physicalDataIter = engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus), + physicalReadSchema, Optional.empty()); + dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter); + } catch (IOException e) { + throw HyracksDataException.create(e); + } return this.hasNext(); } else { return false; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index b3118702a1..19c197916a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -35,6 +35,7 @@ import static org.apache.asterix.external.util.google.gcs.GCSUtils.validatePrope import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE; import static org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import static org.msgpack.core.MessagePack.Code.ARRAY16; import java.io.ByteArrayOutputStream; @@ -71,6 +72,7 @@ import org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; +import org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory; import org.apache.asterix.external.library.JavaLibrary; import org.apache.asterix.external.library.msgpack.MessagePackUtils; import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions; @@ -109,6 +111,12 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelException; public class ExternalDataUtils { @@ -117,6 +125,8 @@ public class ExternalDataUtils { private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024; private static final int HEADER_FUDGE = 64; + private static final Logger LOGGER = LogManager.getLogger(); + static { valueParserFactoryMap.put(ATypeTag.INTEGER, IntegerParserFactory.INSTANCE); valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE); @@ -504,6 +514,26 @@ public class ExternalDataUtils { } } + public static void validateDeltaTableExists(Map<String, String> configuration) throws CompilationException { + Configuration conf = new Configuration(); + String tableMetadataPath = null; + if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE) + .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) { + AwsS3DeltaReaderFactory.configurationBuilder(configuration, conf); + tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://" + + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' + + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); + } + Engine engine = DefaultEngine.create(conf); + io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); + try { + table.getLatestSnapshot(engine); + } catch (KernelException e) { + LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e); + throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); + } + } + public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf, String tableMetadataPath) throws AlgebricksException { HadoopTables tables = new HadoopTables(conf); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java index 891d7f3bfa..bf0938b961 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java @@ -24,6 +24,7 @@ import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_P import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED; import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; +import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists; import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties; import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME; @@ -281,7 +282,6 @@ public class S3Utils { else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); } - // Both parameters should be passed, or neither should be passed (for anonymous/no auth) String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME); String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME); @@ -346,6 +346,9 @@ public class S3Utils { if (!response.sdkHttpResponse().isSuccessful()) { throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); } + if (isDeltaTable(configuration)) { + validateDeltaTableExists(configuration); + } } /**
