This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 5ddcc8f276811f338053249e3ec1b2d4f90ffb4d
Author: ayush.tripathi <[email protected]>
AuthorDate: Thu Nov 21 03:20:11 2024 +0530

    [ASTERIXDB-3503][EXT] Fixing Internal Error issue when Delta table does not 
exists.
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: no
    
    Ext-ref: MB-64314
    
    Change-Id: I8d403c8c0698d9d39dc8988eb8b57588a684dbed
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19098
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Murtadha Hubail <[email protected]>
    Integration-Tests: Murtadha Hubail <[email protected]>
---
 .../deltalake-table-not-exists.00.ddl.sqlpp        | 35 +++++++++++++++++
 .../runtimets/testsuite_external_dataset_s3.xml    |  7 ++++
 .../reader/aws/delta/AwsS3DeltaReaderFactory.java  | 44 ++++++++++++++--------
 .../reader/aws/delta/DeltaFileRecordReader.java    | 10 +++--
 .../asterix/external/util/ExternalDataUtils.java   | 30 +++++++++++++++
 .../asterix/external/util/aws/s3/S3Utils.java      |  5 ++-
 6 files changed, 112 insertions(+), 19 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp
new file mode 100644
index 0000000000..c57de93232
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-table-not-exists/deltalake-table-not-exists.00.ddl.sqlpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+ DROP DATAVERSE test IF EXISTS;
+ CREATE DATAVERSE test;
+
+ USE test;
+
+
+ CREATE TYPE DeltalakeTableType as {
+ };
+
+ CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING 
%adapter%
+ (
+   %template%,
+   ("container"="playground"),
+   ("definition"="delta-data/s1"),
+   ("table-format" = "delta")
+ );
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index d56c1a46ec..d840527a27 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -339,6 +339,13 @@
         <expected-error>Supported file format for 'delta' tables is 'parquet', 
but 'avro' was provided.</expected-error>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="external-dataset">
+      <compilation-unit name="common/deltalake-table-not-exists">
+        <placeholder name="adapter" value="S3" />
+        <output-dir compare="Text">none</output-dir>
+        <expected-error>ASX1108: External source error. 
io.delta.kernel.exceptions.TableNotFoundException: Delta table at path 
`s3a://playground/delta-data/s1` is not found.</expected-error>
+      </compilation-unit>
+    </test-case>
     <test-case FilePath="external-dataset">
       <compilation-unit name="common/avro/avro-types/avro-map">
         <placeholder name="adapter" value="S3" />
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 9d93fcea1c..8dc820becb 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;
 
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -33,6 +34,8 @@ import java.util.Set;
 import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRecordReader;
@@ -60,6 +63,7 @@ import io.delta.kernel.data.FilteredColumnarBatch;
 import io.delta.kernel.data.Row;
 import io.delta.kernel.defaults.engine.DefaultEngine;
 import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
 import io.delta.kernel.internal.InternalScanFileUtils;
 import io.delta.kernel.types.StructType;
 import io.delta.kernel.utils.CloseableIterator;
@@ -87,20 +91,7 @@ public class AwsS3DeltaReaderFactory implements 
IRecordReaderFactory<Object> {
             throws AlgebricksException, HyracksDataException {
         this.configuration = configuration;
         Configuration conf = new Configuration();
-        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            conf.set(S3Constants.HADOOP_SESSION_TOKEN, 
configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        conf.set(S3Constants.HADOOP_REGION, 
configuration.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
-        conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
-                
configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
-        
conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
-                
configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
 ""));
+        configurationBuilder(configuration, conf);
         String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
                 + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
                 + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
@@ -109,7 +100,13 @@ public class AwsS3DeltaReaderFactory implements 
IRecordReaderFactory<Object> {
 
         Engine engine = DefaultEngine.create(conf);
         io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
-        Snapshot snapshot = table.getLatestSnapshot(engine);
+        Snapshot snapshot;
+        try {
+            snapshot = table.getLatestSnapshot(engine);
+        } catch (KernelException e) {
+            LOGGER.info("Failed to get latest snapshot for table: {}", 
tableMetadataPath, e);
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
e, getMessageOrToString(e));
+        }
 
         List<Warning> warnings = new ArrayList<>();
         DeltaConverterContext converterContext = new 
DeltaConverterContext(configuration, warnings);
@@ -192,6 +189,23 @@ public class AwsS3DeltaReaderFactory implements 
IRecordReaderFactory<Object> {
         partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
     }
 
+    public static void configurationBuilder(Map<String, String> configuration, 
Configuration conf) {
+        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+            conf.set(S3Constants.HADOOP_SESSION_TOKEN, 
configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+        }
+        conf.set(S3Constants.HADOOP_REGION, 
configuration.get(S3Constants.REGION_FIELD_NAME));
+        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
+        if (serviceEndpoint != null) {
+            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
+        }
+        conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
+                
configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
+        
conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
+                
configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
 ""));
+    }
+
     @Override
     public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext 
context) throws HyracksDataException {
         try {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index 558f8a96fe..a5b21b6cad 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -133,9 +133,13 @@ public class DeltaFileRecordReader implements 
IRecordReader<Row> {
             scanFile = scanFiles.get(fileIndex);
             fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);
             physicalReadSchema = 
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);
-            physicalDataIter = 
engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
-                    physicalReadSchema, Optional.empty());
-            dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, 
physicalDataIter);
+            try {
+                physicalDataIter = 
engine.getParquetHandler().readParquetFiles(singletonCloseableIterator(fileStatus),
+                        physicalReadSchema, Optional.empty());
+                dataIter = Scan.transformPhysicalData(engine, scanState, 
scanFile, physicalDataIter);
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
             return this.hasNext();
         } else {
             return false;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index b3118702a1..19c197916a 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -35,6 +35,7 @@ import static 
org.apache.asterix.external.util.google.gcs.GCSUtils.validatePrope
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
 import static 
org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 import static org.msgpack.core.MessagePack.Code.ARRAY16;
 
 import java.io.ByteArrayOutputStream;
@@ -71,6 +72,7 @@ import 
org.apache.asterix.external.api.IExternalDataSourceFactory.DataSourceType
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
+import 
org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory;
 import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
@@ -109,6 +111,12 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
 
 public class ExternalDataUtils {
 
@@ -117,6 +125,8 @@ public class ExternalDataUtils {
     private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
     private static final int HEADER_FUDGE = 64;
 
+    private static final Logger LOGGER = LogManager.getLogger();
+
     static {
         valueParserFactoryMap.put(ATypeTag.INTEGER, 
IntegerParserFactory.INSTANCE);
         valueParserFactoryMap.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
@@ -504,6 +514,26 @@ public class ExternalDataUtils {
         }
     }
 
+    public static void validateDeltaTableExists(Map<String, String> 
configuration) throws CompilationException {
+        Configuration conf = new Configuration();
+        String tableMetadataPath = null;
+        if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+                .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
+            AwsS3DeltaReaderFactory.configurationBuilder(configuration, conf);
+            tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+                    + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                    + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+        }
+        Engine engine = DefaultEngine.create(conf);
+        io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
+        try {
+            table.getLatestSnapshot(engine);
+        } catch (KernelException e) {
+            LOGGER.info("Failed to get latest snapshot for table: {}", 
tableMetadataPath, e);
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
e, getMessageOrToString(e));
+        }
+    }
+
     public static void prepareIcebergTableFormat(Map<String, String> 
configuration, Configuration conf,
             String tableMetadataPath) throws AlgebricksException {
         HadoopTables tables = new HadoopTables(conf);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 891d7f3bfa..bf0938b961 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -24,6 +24,7 @@ import static 
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_P
 import static 
org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
 import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableExists;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static 
org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME;
@@ -281,7 +282,6 @@ public class S3Utils {
         else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
         }
-
         // Both parameters should be passed, or neither should be passed (for 
anonymous/no auth)
         String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
         String secretAccessKey = 
configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
@@ -346,6 +346,9 @@ public class S3Utils {
         if (!response.sdkHttpResponse().isSuccessful()) {
             throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
         }
+        if (isDeltaTable(configuration)) {
+            validateDeltaTableExists(configuration);
+        }
     }
 
     /**

Reply via email to