This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 3164da5f58c2a760f3930876e796e9573f920c7b Author: ayush.tripathi <[email protected]> AuthorDate: Wed Aug 21 15:49:53 2024 +0530 [ASTERIXDB-3503][EXT] Deltalake format support. - user model changes: no - storage format changes: no - interface changes: yes Details: Creating external dataset from a given Deltalake table path(S3 as source).. Ext-ref: MB-63840 Change-Id: Iff608397aab711f324861fe83eeb428f73682912 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18708 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ayush Tripathi <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> (cherry picked from commit b96e2e181ef4960aeabbb2c1ff228b7dae8048e7) Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18983 --- .../external_dataset/ExternalDatasetTestUtils.java | 34 ++++ .../aws/AwsS3ExternalDatasetTest.java | 2 + .../deltalake/DeltaTableGenerator.java | 224 +++++++++++++++++++++ .../deltalake-empty/deltalake-empty.00.ddl.sqlpp | 35 ++++ .../deltalake-empty/deltalake-empty.01.query.sqlpp | 22 ++ .../deltalake-modified-data.00.ddl.sqlpp | 35 ++++ .../deltalake-modified-data.01.query.sqlpp | 22 ++ .../deltalake-multiple_file_read.00.ddl.sqlpp | 35 ++++ .../deltalake-multiple_file_read.01.query.sqlpp | 22 ++ .../common/deltalake-empty/deltalake-empty.01.adm | 0 .../deltalake-modified-data.01.adm | 1 + .../deltalake-multiple-file-read/read-data.2.adm | 4 + .../s3/deltalake-empty/deltalake-empty.01.adm | 0 .../deltalake-modified-data.01.adm | 1 + .../external-dataset/s3/deltalake/read-data.2.adm | 4 + .../runtimets/testsuite_external_dataset_s3.xml | 19 ++ .../asterix/common/exceptions/ErrorCode.java | 1 + .../src/main/resources/asx_errormsg/en.properties | 1 + asterixdb/asterix-external-data/pom.xml | 5 + .../reader/aws/delta/AwsS3DeltaReaderFactory.java | 68 +++++++ .../provider/DatasourceFactoryProvider.java | 6 + .../external/util/ExternalDataConstants.java | 3 +- .../asterix/external/util/ExternalDataUtils.java | 134 +++++++----- .../asterix/external/util/aws/s3/S3Utils.java | 8 +- ...pache.asterix.external.api.IRecordReaderFactory | 1 + 25 files changed, 635 insertions(+), 52 deletions(-) diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java index 79d298a7b3..db1bf5039b 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java @@ -23,6 +23,7 @@ import static org.apache.asterix.test.external_dataset.aws.AwsS3ExternalDatasetT import static org.apache.asterix.test.external_dataset.aws.AwsS3ExternalDatasetTest.BROWSE_CONTAINER; import static org.apache.asterix.test.external_dataset.aws.AwsS3ExternalDatasetTest.DYNAMIC_PREFIX_AT_START_CONTAINER; import static org.apache.asterix.test.external_dataset.aws.AwsS3ExternalDatasetTest.FIXED_DATA_CONTAINER; +import static org.apache.asterix.test.external_dataset.deltalake.DeltaTableGenerator.DELTA_GEN_BASEDIR; import static org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.BINARY_GEN_BASEDIR; import java.io.BufferedWriter; @@ -36,9 +37,11 @@ import java.nio.file.Paths; import java.util.Collection; import org.apache.asterix.test.external_dataset.avro.AvroFileConverterUtil; +import org.apache.asterix.test.external_dataset.deltalake.DeltaTableGenerator; import org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil; import org.apache.asterix.testframework.context.TestCaseContext; import org.apache.commons.io.FilenameUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hyracks.api.util.IoUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -114,6 +117,13 @@ public class ExternalDatasetTestUtils { AvroFileConverterUtil.convertToAvro(basePath, avroRawJsonDir, AVRO_GEN_BASEDIR); } + public static void createDeltaTable() throws IOException { + File basePath = new File("."); + // cleaning directory + BinaryFileConverterUtil.cleanBinaryDirectory(basePath, DELTA_GEN_BASEDIR); + DeltaTableGenerator.prepareDeltaTableContainer(new Configuration()); + } + /** * Generate binary files (e.g., parquet files) */ @@ -192,6 +202,10 @@ public class ExternalDatasetTestUtils { loadAvroFiles(); LOGGER.info("Avro files added successfully"); + LOGGER.info("Adding Delta Table files to the bucket"); + loadDeltaTableFiles(); + LOGGER.info("Delta files added successfully"); + LOGGER.info("Files added successfully"); } @@ -412,6 +426,26 @@ public class ExternalDatasetTestUtils { } } + private static void loadDeltaTableFiles() { + String generatedDataBasePath = DELTA_GEN_BASEDIR; + loadDeltaDirectory(generatedDataBasePath, "/empty_delta_table", PARQUET_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/empty_delta_table/_delta_log", JSON_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/modified_delta_table", PARQUET_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/modified_delta_table/_delta_log", JSON_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/multiple_file_delta_table", PARQUET_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/multiple_file_delta_table/_delta_log", JSON_FILTER, "delta-data/"); + } + + private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter, + String definitionPart) { + Collection<File> files = IoUtil.getMatchingFiles(Paths.get(dataBasePath + rootPath), filter); + for (File file : files) { + String fileName = file.getName(); + String externalFilterDefinition = file.getParent().substring(dataBasePath.length() + 1) + "/"; + loadData(file.getParent(), "", fileName, definitionPart + externalFilterDefinition, "", false, false); + } + } + private static void loadDirectory(String dataBasePath, String rootPath, FilenameFilter filter) { File dir = new File(dataBasePath, rootPath); if (!dir.exists() || !dir.isDirectory()) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java index 9a2bfbcc7c..665b8057ec 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/aws/AwsS3ExternalDatasetTest.java @@ -22,6 +22,7 @@ import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils. import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createAvroFilesRecursively; import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFiles; import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createBinaryFilesRecursively; +import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.createDeltaTable; import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setDataPaths; import static org.apache.asterix.test.external_dataset.ExternalDatasetTestUtils.setUploaders; import static org.apache.asterix.test.external_dataset.parquet.BinaryFileConverterUtil.DEFAULT_PARQUET_SRC_PATH; @@ -199,6 +200,7 @@ public class AwsS3ExternalDatasetTest { createBinaryFilesRecursively(EXTERNAL_FILTER_DATA_PATH); createAvroFiles(DEFAULT_PARQUET_SRC_PATH); createAvroFilesRecursively(EXTERNAL_FILTER_DATA_PATH); + createDeltaTable(); setNcEndpoints(testExecutor); startAwsS3MockServer(); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java new file mode 100644 index 0000000000..f041fa2e9c --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.external_dataset.deltalake; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hyracks.api.util.IoUtil; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; + +import io.delta.standalone.DeltaLog; +import io.delta.standalone.Operation; +import io.delta.standalone.OptimisticTransaction; +import io.delta.standalone.actions.Action; +import io.delta.standalone.actions.AddFile; +import io.delta.standalone.actions.Metadata; +import io.delta.standalone.actions.RemoveFile; +import io.delta.standalone.types.IntegerType; +import io.delta.standalone.types.StringType; +import io.delta.standalone.types.StructField; +import io.delta.standalone.types.StructType; + +public class DeltaTableGenerator { + public static final String DELTA_GEN_BASEDIR = "target" + File.separatorChar + "generated_delta_files"; + public static final String DELTA_EMPTY_TABLE = + "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "empty_delta_table"; + public static final String DELTA_MODIFIED_TABLE = + "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "modified_delta_table"; + public static final String DELTA_MULTI_FILE_TABLE = + "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "multiple_file_delta_table"; + + public static void prepareDeltaTableContainer(Configuration conf) { + File basePath = new File("."); + cleanBinaryDirectory(basePath, DELTA_GEN_BASEDIR); + prepareMultipleFilesTable(conf); + prepareModifiedTable(conf); + prepareEmptyTable(conf); + } + + public static void cleanBinaryDirectory(File localDataRoot, String binaryFilesPath) { + try { + File destPath = new File(localDataRoot, binaryFilesPath); + //Delete old generated files + if (destPath.exists()) { + IoUtil.delete(destPath); + } + //Create new directory + Files.createDirectory(Paths.get(destPath.getAbsolutePath())); + } catch (IOException e) { + + } + + } + + public static void prepareEmptyTable(Configuration conf) { + List<Action> actions = List.of(); + DeltaLog log = DeltaLog.forTable(conf, DELTA_EMPTY_TABLE); + OptimisticTransaction txn = log.startTransaction(); + Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>()) + .schema(new StructType().add(new StructField("id", new IntegerType(), true)) + .add(new StructField("data", new StringType(), true))) + .build(); + txn.updateMetadata(metaData); + txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create"); + } + + public static void prepareModifiedTable(Configuration conf) { + Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("data").endRecord(); + try { + Path path = new Path(DELTA_MODIFIED_TABLE, "firstFile.parquet"); + ParquetWriter<GenericData.Record> writer = + AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build(); + + List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema), + new GenericData.Record(schema), new GenericData.Record(schema)); + List<GenericData.Record> fileSecondSnapshotRecords = List.of(new GenericData.Record(schema)); + + fileFirstSnapshotRecords.get(0).put("id", 0); + fileFirstSnapshotRecords.get(0).put("data", "vibrant_mclean"); + + fileFirstSnapshotRecords.get(1).put("id", 1); + fileFirstSnapshotRecords.get(1).put("data", "frosty_wilson"); + + fileFirstSnapshotRecords.get(2).put("id", 2); + fileFirstSnapshotRecords.get(2).put("data", "serene_kirby"); + + fileSecondSnapshotRecords.get(0).put("id", 2); + fileSecondSnapshotRecords.get(0).put("data", "serene_kirby"); + + for (GenericData.Record record : fileFirstSnapshotRecords) { + writer.write(record); + } + + long size = writer.getDataSize(); + writer.close(); + + List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size, + System.currentTimeMillis(), true, null, null)); + DeltaLog log = DeltaLog.forTable(conf, DELTA_MODIFIED_TABLE); + OptimisticTransaction txn = log.startTransaction(); + Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>()) + .schema(new StructType().add(new StructField("id", new IntegerType(), true)) + .add(new StructField("data", new StringType(), true))) + .build(); + txn.updateMetadata(metaData); + txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create"); + + Path path2 = new Path(DELTA_MODIFIED_TABLE, "secondFile.parquet"); + ParquetWriter<GenericData.Record> writer2 = + AvroParquetWriter.<GenericData.Record> builder(path2).withConf(conf).withSchema(schema).build(); + + for (GenericData.Record record : fileSecondSnapshotRecords) { + writer2.write(record); + } + long size2 = writer2.getDataSize(); + writer2.close(); + AddFile addFile = new AddFile("firstFile.parquet", new HashMap<>(), size, System.currentTimeMillis(), true, + null, null); + RemoveFile removeFile = addFile.remove(); + List<Action> actions3 = List.of(removeFile, new AddFile("secondFile.parquet", new HashMap<>(), size2, + System.currentTimeMillis(), true, null, null)); + OptimisticTransaction txn3 = log.startTransaction(); + txn3.commit(actions3, new Operation(Operation.Name.DELETE), "deltalake-table-delete"); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static void prepareMultipleFilesTable(Configuration conf) { + Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name") + .requiredString("age").endRecord(); + try { + Path path = new Path(DELTA_MULTI_FILE_TABLE, "firstFile.parquet"); + ParquetWriter<GenericData.Record> writer = + AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build(); + + List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema), + new GenericData.Record(schema), new GenericData.Record(schema)); + List<GenericData.Record> fileSecondSnapshotRecords = List.of(new GenericData.Record(schema)); + + fileFirstSnapshotRecords.get(0).put("id", 0); + fileFirstSnapshotRecords.get(0).put("name", "Cooper"); + fileFirstSnapshotRecords.get(0).put("age", "42"); + + fileFirstSnapshotRecords.get(1).put("id", 1); + fileFirstSnapshotRecords.get(1).put("name", "Murphy"); + fileFirstSnapshotRecords.get(1).put("age", "16"); + + fileFirstSnapshotRecords.get(2).put("id", 2); + fileFirstSnapshotRecords.get(2).put("name", "Mann"); + fileFirstSnapshotRecords.get(2).put("age", "45"); + + fileSecondSnapshotRecords.get(0).put("id", 3); + fileSecondSnapshotRecords.get(0).put("name", "Brand"); + fileSecondSnapshotRecords.get(0).put("age", "35"); + + for (GenericData.Record record : fileFirstSnapshotRecords) { + writer.write(record); + } + + long size = writer.getDataSize(); + writer.close(); + + List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size, + System.currentTimeMillis(), true, null, null)); + DeltaLog log = DeltaLog.forTable(conf, DELTA_MULTI_FILE_TABLE); + OptimisticTransaction txn = log.startTransaction(); + Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>()) + .schema(new StructType().add(new StructField("id", new IntegerType(), true)) + .add(new StructField("name", new StringType(), true)) + .add(new StructField("age", new IntegerType(), true))) + .build(); + txn.updateMetadata(metaData); + txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create"); + + Path path2 = new Path(DELTA_MULTI_FILE_TABLE, "secondFile.parquet"); + ParquetWriter<GenericData.Record> writer2 = + AvroParquetWriter.<GenericData.Record> builder(path2).withConf(conf).withSchema(schema).build(); + + for (GenericData.Record record : fileSecondSnapshotRecords) { + writer2.write(record); + } + + long size2 = writer2.getDataSize(); + writer2.close(); + + List<Action> actions2 = List.of(new AddFile("secondFile.parquet", new HashMap<>(), size2, + System.currentTimeMillis(), true, null, null)); + OptimisticTransaction txn2 = log.startTransaction(); + txn2.commit(actions2, new Operation(Operation.Name.WRITE), "deltalake-table-create"); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp new file mode 100644 index 0000000000..f21f0131fc --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.00.ddl.sqlpp @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + DROP DATAVERSE test IF EXISTS; + CREATE DATAVERSE test; + + USE test; + + + CREATE TYPE DeltalakeTableType as { + }; + + CREATE EXTERNAL DATASET DeltalakeDataset(DeltalakeTableType) USING %adapter% + ( + %template%, + ("container"="playground"), + ("definition"="my-table-empty"), + ("table-format" = "delta") + ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.01.query.sqlpp new file mode 100644 index 0000000000..84e79142a8 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-empty/deltalake-empty.01.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.00.ddl.sqlpp new file mode 100644 index 0000000000..8adab7721c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.00.ddl.sqlpp @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATAVERSE test IF EXISTS; + CREATE DATAVERSE test; + + USE test; + + + CREATE TYPE DeltalakeTableType as { + }; + + CREATE EXTERNAL DATASET DeltalakeDataset(DeltalakeTableType) USING %adapter% + ( + %template%, + ("container"="playground"), + ("definition"="delta-data/modified_delta_table"), + ("table-format" = "delta") + ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.query.sqlpp new file mode 100644 index 0000000000..84e79142a8 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.00.ddl.sqlpp new file mode 100644 index 0000000000..61c5c3ac38 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.00.ddl.sqlpp @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATAVERSE test IF EXISTS; + CREATE DATAVERSE test; + + USE test; + + + CREATE TYPE DeltalakeTableType as { + }; + + CREATE EXTERNAL DATASET DeltalakeDataset(DeltalakeTableType) USING %adapter% + ( + %template%, + ("container"="playground"), + ("definition"="delta-data/multiple_file_delta_table"), + ("table-format" = "delta") + ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.01.query.sqlpp new file mode 100644 index 0000000000..bfd65819f2 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-multiple-file-read/deltalake-multiple_file_read.01.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-empty/deltalake-empty.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-empty/deltalake-empty.01.adm new file mode 100644 index 0000000000..e69de29bb2 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.adm new file mode 100644 index 0000000000..dfdc3b0a1e --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-modified-data/deltalake-modified-data.01.adm @@ -0,0 +1 @@ +{ "id": 2, "data": "serene_kirby" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.2.adm new file mode 100644 index 0000000000..afae366266 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-multiple-file-read/read-data.2.adm @@ -0,0 +1,4 @@ +{ "id": 0, "name": "Cooper", "age": "42" } +{ "id": 1, "name": "Murphy", "age": "16" } +{ "id": 2, "name": "Mann", "age": "45" } +{ "id": 3, "name": "Brand", "age": "35" } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake-empty/deltalake-empty.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake-empty/deltalake-empty.01.adm new file mode 100644 index 0000000000..e69de29bb2 diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake-modified-data/deltalake-modified-data.01.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake-modified-data/deltalake-modified-data.01.adm new file mode 100644 index 0000000000..dfdc3b0a1e --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake-modified-data/deltalake-modified-data.01.adm @@ -0,0 +1 @@ +{ "id": 2, "data": "serene_kirby" } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake/read-data.2.adm new file mode 100644 index 0000000000..afae366266 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/s3/deltalake/read-data.2.adm @@ -0,0 +1,4 @@ +{ "id": 0, "name": "Cooper", "age": "42" } +{ "id": 1, "name": "Murphy", "age": "16" } +{ "id": 2, "name": "Mann", "age": "45" } +{ "id": 3, "name": "Brand", "age": "35" } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index db612826e0..4e94791e2d 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -301,6 +301,25 @@ </compilation-unit> </test-case> <!-- Parquet Tests End --> + <!-- Deltalake Tests Start --> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/deltalake-multiple-file-read"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/deltalake-multiple-file-read</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/deltalake-empty"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/deltalake-empty</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/deltalake-modified-data"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/deltalake-modified-data</output-dir> + </compilation-unit> + </test-case> <test-case FilePath="external-dataset"> <compilation-unit name="common/avro/avro-types/avro-map"> <placeholder name="adapter" value="S3" /> diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 3694bb2fb5..985a4f699d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -97,6 +97,7 @@ public enum ErrorCode implements IError { UNSUPPORTED_COLUMN_TYPE(67), INVALID_KEY_TYPE(68), FAILED_TO_READ_KEY(69), + INVALID_DELTA_PARAMETER(70), UNSUPPORTED_JRE(100), diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 4b7da0c1ec..b5c06e3aa2 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -41,6 +41,7 @@ 1009 = A returning expression cannot contain dataset access 37,1091 = Type mismatch: expected value of type %1$s, but got the value of type %2$s 51 = Incomparable input types: %1$s and %2$s +70 = Table Type 'delta' supports parquet file formats only. # Data errors 6 = Invalid format for %1$s in %2$s diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index 7f3c7ecea1..fdacc685c2 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -573,6 +573,11 @@ <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> </dependency> + <dependency> + <groupId>io.delta</groupId> + <artifactId>delta-standalone_2.12</artifactId> + <version>3.0.0</version> + </dependency> </dependencies> <!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 --> <repositories> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java new file mode 100644 index 0000000000..f5a2cd5820 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import static org.apache.asterix.external.util.ExternalDataUtils.prepareDeltaTableFormat; +import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; +import org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.aws.s3.S3Constants; +import org.apache.hadoop.conf.Configuration; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.IWarningCollector; + +public class AwsS3DeltaReaderFactory extends AwsS3ParquetReaderFactory { + + private static final long serialVersionUID = 1L; + + @Override + public void configure(IServiceContext serviceCtx, Map<String, String> configuration, + IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) + throws AlgebricksException, HyracksDataException { + + Configuration conf = new Configuration(); + conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME)); + conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME)); + conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME)); + String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); + if (serviceEndpoint != null) { + conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint); + } + String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://" + + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' + + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); + + prepareDeltaTableFormat(configuration, conf, tableMetadataPath); + super.configure(serviceCtx, configuration, warningCollector, filterEvaluatorFactory); + } + + @Override + public Set<String> getReaderSupportedFormats() { + return Collections.singleton(ExternalDataConstants.FORMAT_DELTA); + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java index f35dce4aa5..0360c15fb1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.external.provider; +import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; + import java.io.IOException; import java.io.InputStream; import java.lang.reflect.InvocationTargetException; @@ -121,6 +123,10 @@ public class DatasourceFactoryProvider { if (factories.containsKey(adaptorName)) { Map<String, Class<?>> formatClassMap = factories.get(adaptorName); String format = configuration.get(ExternalDataConstants.KEY_FORMAT); + if (isDeltaTable(configuration)) { + format = configuration.get(ExternalDataConstants.TABLE_FORMAT); + return getInstance(formatClassMap.getOrDefault(format, formatClassMap.get(DEFAULT_FORMAT))); + } return getInstance(formatClassMap.getOrDefault(format, formatClassMap.get(DEFAULT_FORMAT))); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java index 3139be7f70..d8f89c22d7 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java @@ -119,7 +119,7 @@ public class ExternalDataConstants { // a string representing the format of the record (for adapters which produces records with additional information like pk or metadata) public static final String KEY_RECORD_FORMAT = "record-format"; public static final String TABLE_FORMAT = "table-format"; - public static final String ICEBERG_METADATA_LOCATION = "metadata-path"; + public static final String TABLE_METADATA_LOCATION = "metadata-path"; public static final int SUPPORTED_ICEBERG_FORMAT_VERSION = 1; public static final String KEY_META_TYPE_NAME = "meta-type-name"; public static final String KEY_ADAPTER_NAME = "adapter-name"; @@ -205,6 +205,7 @@ public class ExternalDataConstants { public static final String FORMAT_TSV = "tsv"; public static final String FORMAT_PARQUET = "parquet"; public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg"; + public static final String FORMAT_DELTA = "delta"; public static final Set<String> ALL_FORMATS; public static final Set<String> TEXTUAL_FORMATS; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index 104f301071..8d332610e8 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -108,6 +108,10 @@ import org.apache.iceberg.Table; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; +import io.delta.standalone.DeltaLog; +import io.delta.standalone.Snapshot; +import io.delta.standalone.actions.AddFile; + public class ExternalDataUtils { private static final Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = new EnumMap<>(ATypeTag.class); private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024; @@ -469,6 +473,10 @@ public class ExternalDataUtils { } if (configuration.containsKey(ExternalDataConstants.TABLE_FORMAT)) { + if (configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_DELTA)) { + configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_NOOP); + configuration.put(ExternalDataConstants.KEY_FORMAT, ExternalDataConstants.FORMAT_PARQUET); + } prepareTableFormat(configuration); } } @@ -478,68 +486,96 @@ public class ExternalDataUtils { * * @param configuration external data configuration */ - public static void prepareTableFormat(Map<String, String> configuration) throws AlgebricksException { - // Apache Iceberg table format - if (configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_APACHE_ICEBERG)) { - Configuration conf = new Configuration(); - - String metadata_path = configuration.get(ExternalDataConstants.ICEBERG_METADATA_LOCATION); - - // If the table is in S3 - if (configuration.get(ExternalDataConstants.KEY_READER) - .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) { - - conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME)); - conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, - configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME)); - metadata_path = S3Constants.HADOOP_S3_PROTOCOL + "://" - + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' - + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); - } else if (configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_HDFS)) { - conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, - configuration.get(ExternalDataConstants.KEY_HDFS_URL)); - metadata_path = configuration.get(ExternalDataConstants.KEY_HDFS_URL) + '/' + metadata_path; - } - - HadoopTables tables = new HadoopTables(conf); + public static boolean isDeltaTable(Map<String, String> configuration) { + return configuration.containsKey(ExternalDataConstants.TABLE_FORMAT) + && configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_DELTA); + } - Table icebergTable = tables.load(metadata_path); + public static void validateDeltaTableProperties(Map<String, String> configuration) throws CompilationException { + if (!(configuration.get(ExternalDataConstants.KEY_FORMAT) == null + || configuration.get(ExternalDataConstants.KEY_FORMAT).equals(ExternalDataConstants.FORMAT_PARQUET))) { + throw new CompilationException(ErrorCode.INVALID_DELTA_PARAMETER); + } + } - if (icebergTable instanceof BaseTable) { - BaseTable baseTable = (BaseTable) icebergTable; + public static void prepareDeltaTableFormat(Map<String, String> configuration, Configuration conf, + String tableMetadataPath) { + DeltaLog deltaLog = DeltaLog.forTable(conf, tableMetadataPath); + Snapshot snapshot = deltaLog.snapshot(); + List<AddFile> dataFiles = snapshot.getAllFiles(); + StringBuilder builder = new StringBuilder(); + for (AddFile batchFile : dataFiles) { + builder.append(","); + String path = batchFile.getPath(); + builder.append(tableMetadataPath).append('/').append(path); + } + if (builder.length() > 0) { + builder.deleteCharAt(0); + } + configuration.put(ExternalDataConstants.KEY_PATH, builder.toString()); + } - if (baseTable.operations().current() - .formatVersion() != ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION) { - throw new AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_FORMAT_VERSION, - "AsterixDB only supports Iceberg version up to " - + ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION); - } + public static void prepareIcebergTableFormat(Map<String, String> configuration, Configuration conf, + String tableMetadataPath) throws AlgebricksException { + HadoopTables tables = new HadoopTables(conf); + Table icebergTable = tables.load(tableMetadataPath); - try (CloseableIterable<FileScanTask> fileScanTasks = baseTable.newScan().planFiles()) { + if (icebergTable instanceof BaseTable) { + BaseTable baseTable = (BaseTable) icebergTable; - StringBuilder builder = new StringBuilder(); + if (baseTable.operations().current() + .formatVersion() != ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION) { + throw new AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_FORMAT_VERSION, + "AsterixDB only supports Iceberg version up to " + + ExternalDataConstants.SUPPORTED_ICEBERG_FORMAT_VERSION); + } - for (FileScanTask task : fileScanTasks) { - builder.append(","); - String path = task.file().path().toString(); - builder.append(path); - } + try (CloseableIterable<FileScanTask> fileScanTasks = baseTable.newScan().planFiles()) { - if (builder.length() > 0) { - builder.deleteCharAt(0); - } + StringBuilder builder = new StringBuilder(); - configuration.put(ExternalDataConstants.KEY_PATH, builder.toString()); + for (FileScanTask task : fileScanTasks) { + builder.append(","); + String path = task.file().path().toString(); + builder.append(path); + } - } catch (IOException e) { - throw new AsterixException(ErrorCode.ERROR_READING_ICEBERG_METADATA, e); + if (builder.length() > 0) { + builder.deleteCharAt(0); } - } else { - throw new AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_TABLE, - "Invalid iceberg base table. Please remove metadata specifiers"); + configuration.put(ExternalDataConstants.KEY_PATH, builder.toString()); + + } catch (IOException e) { + throw new AsterixException(ErrorCode.ERROR_READING_ICEBERG_METADATA, e); } + } else { + throw new AsterixException(ErrorCode.UNSUPPORTED_ICEBERG_TABLE, + "Invalid iceberg base table. Please remove metadata specifiers"); + } + } + + public static void prepareTableFormat(Map<String, String> configuration) throws AlgebricksException { + Configuration conf = new Configuration(); + String tableMetadataPath = configuration.get(ExternalDataConstants.TABLE_METADATA_LOCATION); + + // If the table is in S3 + if (configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) { + + conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME)); + conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME)); + tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://" + + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' + + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); + } else if (configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_HDFS)) { + conf.set(ExternalDataConstants.KEY_HADOOP_FILESYSTEM_URI, + configuration.get(ExternalDataConstants.KEY_HDFS_URL)); + tableMetadataPath = configuration.get(ExternalDataConstants.KEY_HDFS_URL) + '/' + tableMetadataPath; + } + // Apache Iceberg table format + if (configuration.get(ExternalDataConstants.TABLE_FORMAT).equals(ExternalDataConstants.FORMAT_APACHE_ICEBERG)) { + prepareIcebergTableFormat(configuration, conf, tableMetadataPath); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java index 6a16913258..891d7f3bfa 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java @@ -23,6 +23,8 @@ import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_I import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT; import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED; import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; +import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; +import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties; import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; import static org.apache.asterix.external.util.aws.s3.S3Constants.ACCESS_KEY_ID_FIELD_NAME; import static org.apache.asterix.external.util.aws.s3.S3Constants.ERROR_INTERNAL_ERROR; @@ -272,9 +274,11 @@ public class S3Utils { */ public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc, IWarningCollector collector) throws CompilationException { - + if (isDeltaTable(configuration)) { + validateDeltaTableProperties(configuration); + } // check if the format property is present - if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); } diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory index 88f3fcb440..2c15b5a844 100644 --- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory +++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory @@ -22,6 +22,7 @@ org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory org.apache.asterix.external.input.record.reader.aws.AwsS3ReaderFactory org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory +org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory org.apache.asterix.external.input.record.reader.gcs.GCSReaderFactory org.apache.asterix.external.input.record.reader.azure.blob.AzureBlobReaderFactory org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
