This is an automated email from the ASF dual-hosted git repository. alsuliman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit f091fd9930f9839f3a26497e5e1c8594be1dd3fe Author: Peeyush Gupta <[email protected]> AuthorDate: Wed Apr 23 11:29:49 2025 -0700 [ASTERIXDB-3602][EXT] Reading Delta tables with ISO8601 formatted timestamp partition - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-66229 Change-Id: I2cf9e532882dad8c778b43d72f3f1bcab37d5a00 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19693 Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Peeyush Gupta <[email protected]> --- .../external_dataset/ExternalDatasetTestUtils.java | 3 + .../deltalake/DeltaTableGenerator.java | 129 ++++++++ .../deltalake-partitioned-file-read.00.ddl.sqlpp | 44 +++ .../deltalake-partitioned-file-read.01.query.sqlpp | 22 ++ .../deltalake-partitioned-file-read.02.query.sqlpp | 22 ++ .../read-data.1.adm | 9 + .../read-data.2.adm | 3 + .../runtimets/testsuite_external_dataset_s3.xml | 6 + .../reader/aws/delta/DefaultExpressionUtils.java | 366 +++++++++++++++++++++ .../input/record/reader/aws/delta/DeltaEngine.java | 40 +++ .../reader/aws/delta/DeltaExpressionEvaluator.java | 339 +++++++++++++++++++ .../reader/aws/delta/DeltaExpressionHandler.java | 40 +++ .../reader/aws/delta/DeltaFileRecordReader.java | 215 +++++++++++- .../reader/aws/delta/DeltaPredicateEvaluator.java | 66 ++++ .../reader/aws/delta/DeltaReaderFactory.java | 3 +- .../reader/aws/delta/ElementAtEvaluator.java | 151 +++++++++ .../record/reader/aws/delta/ExpressionVisitor.java | 136 ++++++++ .../reader/aws/delta/ImplicitCastExpression.java | 267 +++++++++++++++ .../reader/aws/delta/PartitionValueEvaluator.java | 136 ++++++++ 19 files changed, 1992 insertions(+), 5 deletions(-) diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java index 4080f4c99b..d31fb66a27 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/ExternalDatasetTestUtils.java @@ -449,6 +449,9 @@ public class ExternalDatasetTestUtils { loadDeltaDirectory(generatedDataBasePath, "/delta_file_size_one", PARQUET_FILTER, "delta-data/"); loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table", PARQUET_FILTER, "delta-data/"); loadDeltaDirectory(generatedDataBasePath, "/partitioned_delta_table/_delta_log", JSON_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/timestamp_partitioned_delta_table", PARQUET_FILTER, "delta-data/"); + loadDeltaDirectory(generatedDataBasePath, "/timestamp_partitioned_delta_table/_delta_log", JSON_FILTER, + "delta-data/"); } private static void loadDeltaDirectory(String dataBasePath, String rootPath, FilenameFilter filter, diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java index 7c8840a973..a3b20f8616 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java @@ -48,6 +48,7 @@ import io.delta.standalone.types.IntegerType; import io.delta.standalone.types.StringType; import io.delta.standalone.types.StructField; import io.delta.standalone.types.StructType; +import io.delta.standalone.types.TimestampType; public class DeltaTableGenerator { public static final String DELTA_GEN_BASEDIR = "target" + File.separatorChar + "generated_delta_files"; @@ -63,6 +64,8 @@ public class DeltaTableGenerator { "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "delta_file_size_nine"; public static final String DELTA_PARTITIONED_TABLE = "target" + File.separatorChar + "generated_delta_files" + File.separatorChar + "partitioned_delta_table"; + public static final String DELTA_TIMESTAMP_PARTITIONED_TABLE = "target" + File.separatorChar + + "generated_delta_files" + File.separatorChar + "timestamp_partitioned_delta_table"; public static void prepareDeltaTableContainer(Configuration conf) { File basePath = new File("."); @@ -73,6 +76,7 @@ public class DeltaTableGenerator { prepareFileSizeOne(conf); prepareFileSizeNine(conf); preparePartitionedTable(conf); + prepareTimestampPartitionedTable(conf); } public static void cleanBinaryDirectory(File localDataRoot, String binaryFilesPath) { @@ -498,4 +502,129 @@ public class DeltaTableGenerator { } } + public static void prepareTimestampPartitionedTable(Configuration conf) { + Schema schema = SchemaBuilder.record("MyRecord").fields().requiredInt("id").requiredString("name") + .requiredString("timestamp").endRecord(); + try { + List<GenericData.Record> fileFirstSnapshotRecords = List.of(new GenericData.Record(schema), + new GenericData.Record(schema), new GenericData.Record(schema)); + List<GenericData.Record> fileSecondSnapshotRecords = + List.of(new GenericData.Record(schema), new GenericData.Record(schema)); + List<GenericData.Record> fileThirdSnapshotRecords = + List.of(new GenericData.Record(schema), new GenericData.Record(schema)); + List<GenericData.Record> fileFourthSnapshotRecords = + List.of(new GenericData.Record(schema), new GenericData.Record(schema)); + + fileFirstSnapshotRecords.get(0).put("id", 0); + fileFirstSnapshotRecords.get(0).put("name", "Order 1"); + fileFirstSnapshotRecords.get(0).put("timestamp", "2025-01-01 00:01:20"); + + fileFirstSnapshotRecords.get(1).put("id", 1); + fileFirstSnapshotRecords.get(1).put("name", "Order 2"); + fileFirstSnapshotRecords.get(1).put("timestamp", "2025-01-01 00:01:20.00"); + + fileFirstSnapshotRecords.get(2).put("id", 2); + fileFirstSnapshotRecords.get(2).put("name", "Order 3"); + fileFirstSnapshotRecords.get(2).put("timestamp", "2025-01-01 00:01:20.000000"); + + fileSecondSnapshotRecords.get(0).put("id", 3); + fileSecondSnapshotRecords.get(0).put("name", "Order 10"); + fileSecondSnapshotRecords.get(0).put("timestamp", "2025-01-01T00:01:30Z"); + + fileSecondSnapshotRecords.get(1).put("id", 4); + fileSecondSnapshotRecords.get(1).put("name", "Order 11"); + fileSecondSnapshotRecords.get(1).put("timestamp", "2025-01-01T00T01:30.000000Z"); + + fileThirdSnapshotRecords.get(0).put("id", 5); + fileThirdSnapshotRecords.get(0).put("name", "Order 21"); + fileThirdSnapshotRecords.get(0).put("timestamp", "2025-01-02 00:02:20.100"); + + fileThirdSnapshotRecords.get(1).put("id", 6); + fileThirdSnapshotRecords.get(1).put("name", "Order 22"); + fileThirdSnapshotRecords.get(1).put("timestamp", "2025-01-02 00:02:20.100"); + + fileFourthSnapshotRecords.get(0).put("id", 7); + fileFourthSnapshotRecords.get(0).put("name", "Order 30"); + fileFourthSnapshotRecords.get(0).put("timestamp", "2025-01-02T00:02:30.100000Z"); + + fileFourthSnapshotRecords.get(1).put("id", 8); + fileFourthSnapshotRecords.get(1).put("name", "Order 31"); + fileFourthSnapshotRecords.get(1).put("timestamp", "2025-01-02T00:02:30.100000Z"); + + Path path = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "firstFile.parquet"); + ParquetWriter<GenericData.Record> writer = + AvroParquetWriter.<GenericData.Record> builder(path).withConf(conf).withSchema(schema).build(); + for (GenericData.Record record : fileFirstSnapshotRecords) { + writer.write(record); + } + long size = writer.getDataSize(); + writer.close(); + + Path path2 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "secondFile.parquet"); + ParquetWriter<GenericData.Record> writer2 = + AvroParquetWriter.<GenericData.Record> builder(path2).withConf(conf).withSchema(schema).build(); + for (GenericData.Record record : fileSecondSnapshotRecords) { + writer2.write(record); + } + long size2 = writer2.getDataSize(); + writer2.close(); + + Path path3 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "thirdFile.parquet"); + ParquetWriter<GenericData.Record> writer3 = + AvroParquetWriter.<GenericData.Record> builder(path3).withConf(conf).withSchema(schema).build(); + for (GenericData.Record record : fileThirdSnapshotRecords) { + writer3.write(record); + } + long size3 = writer3.getDataSize(); + writer3.close(); + + Path path4 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "fourthFile.parquet"); + ParquetWriter<GenericData.Record> writer4 = + AvroParquetWriter.<GenericData.Record> builder(path4).withConf(conf).withSchema(schema).build(); + for (GenericData.Record record : fileFourthSnapshotRecords) { + writer4.write(record); + } + long size4 = writer4.getDataSize(); + writer4.close(); + + DeltaLog log = DeltaLog.forTable(conf, DELTA_TIMESTAMP_PARTITIONED_TABLE); + OptimisticTransaction txn = log.startTransaction(); + Metadata metaData = txn.metadata().copyBuilder().partitionColumns(Arrays.asList("timestamp")) + .schema(new StructType().add(new StructField("id", new IntegerType(), true)) + .add(new StructField("name", new StringType(), true)) + .add(new StructField("timestamp", new TimestampType(), true))) + .build(); + + Map<String, String> partitionValues = new HashMap<>(); + partitionValues.put("timestamp", "2025-01-01 00:01:20"); + List<Action> actions = List.of(new AddFile("firstFile.parquet", partitionValues, size, + System.currentTimeMillis(), true, null, null)); + txn.updateMetadata(metaData); + txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "deltalake-table-create"); + + txn = log.startTransaction(); + partitionValues.clear(); + partitionValues.put("timestamp", "2025-01-01T00:01:30Z"); + actions = List.of(new AddFile("secondFile.parquet", partitionValues, size2, System.currentTimeMillis(), + true, null, null)); + txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create"); + + txn = log.startTransaction(); + partitionValues.clear(); + partitionValues.put("timestamp", "2025-01-02 00:02:20.100"); + actions = List.of(new AddFile("thirdFile.parquet", partitionValues, size3, System.currentTimeMillis(), true, + null, null)); + txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create"); + + txn = log.startTransaction(); + partitionValues.clear(); + partitionValues.put("timestamp", "2025-01-02T00:02:30.100000Z"); + actions = List.of(new AddFile("fourthFile.parquet", partitionValues, size4, System.currentTimeMillis(), + true, null, null)); + txn.commit(actions, new Operation(Operation.Name.WRITE), "deltalake-table-create"); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp new file mode 100644 index 0000000000..da4b0e82ab --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.00.ddl.sqlpp @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATAVERSE test IF EXISTS; + CREATE DATAVERSE test; + + USE test; + + + CREATE TYPE DeltalakeTableType as { + }; + + CREATE EXTERNAL COLLECTION DeltalakeDataset(DeltalakeTableType) USING %adapter% + ( + %template%, + ("container"="playground"), + ("definition"="delta-data/timestamp_partitioned_delta_table"), + ("table-format" = "delta") + ); + + CREATE EXTERNAL COLLECTION DeltalakeDataset2(DeltalakeTableType) USING %adapter% + ( + %template%, + ("container"="playground"), + ("definition"="delta-data/timestamp_partitioned_delta_table"), + ("table-format" = "delta"), + ("timestamp-to-long" = "false") + ); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp new file mode 100644 index 0000000000..bfd65819f2 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.01.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset as ds order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp new file mode 100644 index 0000000000..a84d381513 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-dataset/common/deltalake-timestamp-partitioned-file-read/deltalake-partitioned-file-read.02.query.sqlpp @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + USE test; + + SELECT element ds FROM DeltalakeDataset2 as ds where ds.timestamp=datetime("2025-01-01T00:01:20Z") order by ds.id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.1.adm new file mode 100644 index 0000000000..dabfb4fecf --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.1.adm @@ -0,0 +1,9 @@ +{ "id": 0, "name": "Order 1", "timestamp": 1735689680000 } +{ "id": 1, "name": "Order 2", "timestamp": 1735689680000 } +{ "id": 2, "name": "Order 3", "timestamp": 1735689680000 } +{ "id": 3, "name": "Order 10", "timestamp": 1735689690000 } +{ "id": 4, "name": "Order 11", "timestamp": 1735689690000 } +{ "id": 5, "name": "Order 21", "timestamp": 1735776140100 } +{ "id": 6, "name": "Order 22", "timestamp": 1735776140100 } +{ "id": 7, "name": "Order 30", "timestamp": 1735776150100 } +{ "id": 8, "name": "Order 31", "timestamp": 1735776150100 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.2.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.2.adm new file mode 100644 index 0000000000..c185038fa0 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-dataset/common/deltalake-timestamp-partitioned-file-read/read-data.2.adm @@ -0,0 +1,3 @@ +{ "id": 0, "name": "Order 1", "timestamp": datetime("2025-01-01T00:01:20.000") } +{ "id": 1, "name": "Order 2", "timestamp": datetime("2025-01-01T00:01:20.000") } +{ "id": 2, "name": "Order 3", "timestamp": datetime("2025-01-01T00:01:20.000") \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index 38ebf01c97..c9f9da661d 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -638,6 +638,12 @@ <output-dir compare="Text">common/deltalake-partitioned-file-read</output-dir> </compilation-unit> </test-case> + <test-case FilePath="external-dataset"> + <compilation-unit name="common/deltalake-timestamp-partitioned-file-read"> + <placeholder name="adapter" value="S3" /> + <output-dir compare="Text">common/deltalake-timestamp-partitioned-file-read</output-dir> + </compilation-unit> + </test-case> <test-case FilePath="external-dataset"> <compilation-unit name="common/avro/avro-types/avro-map"> <placeholder name="adapter" value="S3" /> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java new file mode 100644 index 0000000000..560360c840 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.util.Comparator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +import io.delta.kernel.data.ArrayValue; +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.MapValue; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DecimalType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.TimestampNTZType; +import io.delta.kernel.types.TimestampType; + +/** + * Utility methods used by the default expression evaluator. + */ +class DefaultExpressionUtils { + private DefaultExpressionUtils() { + } + + static final Comparator<String> STRING_COMPARATOR = (leftOp, rightOp) -> { + byte[] leftBytes = leftOp.getBytes(StandardCharsets.UTF_8); + byte[] rightBytes = rightOp.getBytes(StandardCharsets.UTF_8); + int i = 0; + while (i < leftBytes.length && i < rightBytes.length) { + if (leftBytes[i] != rightBytes[i]) { + return Byte.toUnsignedInt(leftBytes[i]) - Byte.toUnsignedInt(rightBytes[i]); + } + i++; + } + return Integer.compare(leftBytes.length, rightBytes.length); + }; + + static final Comparator<byte[]> BINARY_COMPARTOR = (leftOp, rightOp) -> { + int i = 0; + while (i < leftOp.length && i < rightOp.length) { + if (leftOp[i] != rightOp[i]) { + return Byte.toUnsignedInt(leftOp[i]) - Byte.toUnsignedInt(rightOp[i]); + } + i++; + } + return Integer.compare(leftOp.length, rightOp.length); + }; + + /** + * Utility method that calculates the nullability result from given two vectors. Result is + * null if at least one side is a null. + */ + static boolean[] evalNullability(ColumnVector left, ColumnVector right) { + int numRows = left.getSize(); + boolean[] nullability = new boolean[numRows]; + for (int rowId = 0; rowId < numRows; rowId++) { + nullability[rowId] = left.isNullAt(rowId) || right.isNullAt(rowId); + } + return nullability; + } + + /** + * Wraps a child vector as a boolean {@link ColumnVector} with the given value and nullability + * accessors. + */ + static ColumnVector booleanWrapperVector(ColumnVector childVector, Function<Integer, Boolean> valueAccessor, + Function<Integer, Boolean> nullabilityAccessor) { + + return new ColumnVector() { + + @Override + public DataType getDataType() { + return BooleanType.BOOLEAN; + } + + @Override + public int getSize() { + return childVector.getSize(); + } + + @Override + public void close() { + childVector.close(); + } + + @Override + public boolean isNullAt(int rowId) { + return nullabilityAccessor.apply(rowId); + } + + @Override + public boolean getBoolean(int rowId) { + return valueAccessor.apply(rowId); + } + }; + } + + /** + * Utility method to compare the left and right according to the natural ordering + * and return an integer array where each row contains the comparison result (-1, 0, 1) for + * corresponding rows in the input vectors compared. + * <p> + * Only primitive data types are supported. + */ + static int[] compare(ColumnVector left, ColumnVector right) { + checkArgument(left.getSize() == right.getSize(), "Left and right operand have different vector sizes."); + DataType dataType = left.getDataType(); + + int numRows = left.getSize(); + int[] result = new int[numRows]; + if (dataType instanceof BooleanType) { + compareBoolean(left, right, result); + } else if (dataType instanceof ByteType) { + compareByte(left, right, result); + } else if (dataType instanceof ShortType) { + compareShort(left, right, result); + } else if (dataType instanceof IntegerType || dataType instanceof DateType) { + compareInt(left, right, result); + } else if (dataType instanceof LongType || dataType instanceof TimestampType + || dataType instanceof TimestampNTZType) { + compareLong(left, right, result); + } else if (dataType instanceof FloatType) { + compareFloat(left, right, result); + } else if (dataType instanceof DoubleType) { + compareDouble(left, right, result); + } else if (dataType instanceof DecimalType) { + compareDecimal(left, right, result); + } else if (dataType instanceof StringType) { + compareString(left, right, result); + } else if (dataType instanceof BinaryType) { + compareBinary(left, right, result); + } else { + throw new UnsupportedOperationException(dataType + " can not be compared."); + } + return result; + } + + static void compareBoolean(ColumnVector left, ColumnVector right, int[] result) { + for (int rowId = 0; rowId < left.getSize(); rowId++) { + if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { + result[rowId] = Boolean.compare(left.getBoolean(rowId), right.getBoolean(rowId)); + } + } + } + + static void compareByte(ColumnVector left, ColumnVector right, int[] result) { + for (int rowId = 0; rowId < left.getSize(); rowId++) { + if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { + result[rowId] = Byte.compare(left.getByte(rowId), right.getByte(rowId)); + } + } + } + + static void compareShort(ColumnVector left, ColumnVector right, int[] result) { + for (int rowId = 0; rowId < left.getSize(); rowId++) { + if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { + result[rowId] = Short.compare(left.getShort(rowId), right.getShort(rowId)); + } + } + } + + static void compareInt(ColumnVector left, ColumnVector right, int[] result) { + for (int rowId = 0; rowId < left.getSize(); rowId++) { + if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { + result[rowId] = Integer.compare(left.getInt(rowId), right.getInt(rowId)); + } + } + } + + static void compareLong(ColumnVector left, ColumnVector right, int[] result) { + for (int rowId = 0; rowId < left.getSize(); rowId++) { + if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { + result[rowId] = Long.compare(left.getLong(rowId), right.getLong(rowId)); + } + } + } + + static void compareFloat(ColumnVector left, ColumnVector right, int[] result) { + for (int rowId = 0; rowId < left.getSize(); rowId++) { + if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { + result[rowId] = Float.compare(left.getFloat(rowId), right.getFloat(rowId)); + } + } + } + + static void compareDouble(ColumnVector left, ColumnVector right, int[] result) { + for (int rowId = 0; rowId < left.getSize(); rowId++) { + if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { + result[rowId] = Double.compare(left.getDouble(rowId), right.getDouble(rowId)); + } + } + } + + static void compareString(ColumnVector left, ColumnVector right, int[] result) { + for (int rowId = 0; rowId < left.getSize(); rowId++) { + if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { + result[rowId] = STRING_COMPARATOR.compare(left.getString(rowId), right.getString(rowId)); + } + } + } + + static void compareDecimal(ColumnVector left, ColumnVector right, int[] result) { + Comparator<BigDecimal> comparator = Comparator.naturalOrder(); + for (int rowId = 0; rowId < left.getSize(); rowId++) { + if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { + result[rowId] = comparator.compare(left.getDecimal(rowId), right.getDecimal(rowId)); + } + } + } + + static void compareBinary(ColumnVector left, ColumnVector right, int[] result) { + for (int rowId = 0; rowId < left.getSize(); rowId++) { + if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { + result[rowId] = BINARY_COMPARTOR.compare(left.getBinary(rowId), right.getBinary(rowId)); + } + } + } + + static Expression childAt(Expression expression, int index) { + return expression.getChildren().get(index); + } + + /** + * Combines a list of column vectors into one column vector based on the resolution of + * idxToReturn + * @param vectors List of ColumnVectors of the same data type with length >= 1 + * @param idxToReturn Function that takes in a rowId and returns the index of the column vector + * to use as the return value + */ + static ColumnVector combinationVector(List<ColumnVector> vectors, Function<Integer, Integer> idxToReturn) { + return new ColumnVector() { + // Store the last lookup value to avoid multiple looks up for same rowId. + // The general pattern is call `isNullAt(rowId)` followed by `getBoolean(rowId)` or + // some other value accessor. So the cache of one value is enough. + private int lastLookupRowId = -1; + private ColumnVector lastLookupVector = null; + + @Override + public DataType getDataType() { + return vectors.get(0).getDataType(); + } + + @Override + public int getSize() { + return vectors.get(0).getSize(); + } + + @Override + public void close() { + Utils.closeCloseables(vectors.toArray(new ColumnVector[0])); + } + + @Override + public boolean isNullAt(int rowId) { + return getVector(rowId).isNullAt(rowId); + } + + @Override + public boolean getBoolean(int rowId) { + return getVector(rowId).getBoolean(rowId); + } + + @Override + public byte getByte(int rowId) { + return getVector(rowId).getByte(rowId); + } + + @Override + public short getShort(int rowId) { + return getVector(rowId).getShort(rowId); + } + + @Override + public int getInt(int rowId) { + return getVector(rowId).getInt(rowId); + } + + @Override + public long getLong(int rowId) { + return getVector(rowId).getLong(rowId); + } + + @Override + public float getFloat(int rowId) { + return getVector(rowId).getFloat(rowId); + } + + @Override + public double getDouble(int rowId) { + return getVector(rowId).getDouble(rowId); + } + + @Override + public byte[] getBinary(int rowId) { + return getVector(rowId).getBinary(rowId); + } + + @Override + public String getString(int rowId) { + return getVector(rowId).getString(rowId); + } + + @Override + public BigDecimal getDecimal(int rowId) { + return getVector(rowId).getDecimal(rowId); + } + + @Override + public MapValue getMap(int rowId) { + return getVector(rowId).getMap(rowId); + } + + @Override + public ArrayValue getArray(int rowId) { + return getVector(rowId).getArray(rowId); + } + + @Override + public ColumnVector getChild(int ordinal) { + return combinationVector(vectors.stream().map(v -> v.getChild(ordinal)).collect(Collectors.toList()), + idxToReturn); + } + + private ColumnVector getVector(int rowId) { + if (rowId == lastLookupRowId) { + return lastLookupVector; + } + lastLookupRowId = rowId; + lastLookupVector = vectors.get(idxToReturn.apply(rowId)); + return lastLookupVector; + } + }; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java new file mode 100644 index 0000000000..1b4e12e62b --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import org.apache.hadoop.conf.Configuration; + +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.ExpressionHandler; + +public class DeltaEngine extends DefaultEngine { + + protected DeltaEngine(Configuration configuration) { + super(configuration); + } + + @Override + public ExpressionHandler getExpressionHandler() { + return new DeltaExpressionHandler(); + } + + public static DeltaEngine create(Configuration configuration) { + return new DeltaEngine(configuration); + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java new file mode 100644 index 0000000000..b3082b5e8a --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import static io.delta.kernel.internal.util.ExpressionUtils.getLeft; +import static io.delta.kernel.internal.util.ExpressionUtils.getRight; +import static io.delta.kernel.internal.util.ExpressionUtils.getUnaryChild; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.booleanWrapperVector; +import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.childAt; +import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.compare; +import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.evalNullability; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.defaults.internal.data.vector.DefaultBooleanVector; +import io.delta.kernel.defaults.internal.data.vector.DefaultConstantVector; +import io.delta.kernel.defaults.internal.expressions.DefaultExpressionEvaluator; +import io.delta.kernel.expressions.AlwaysFalse; +import io.delta.kernel.expressions.AlwaysTrue; +import io.delta.kernel.expressions.And; +import io.delta.kernel.expressions.Column; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.expressions.Or; +import io.delta.kernel.expressions.PartitionValueExpression; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.expressions.ScalarExpression; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DecimalType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampNTZType; +import io.delta.kernel.types.TimestampType; + +public class DeltaExpressionEvaluator extends DefaultExpressionEvaluator { + + private final Expression expression; + + public DeltaExpressionEvaluator(StructType structType, Expression expression, DataType dataType) { + super(structType, expression, dataType); + this.expression = expression; + } + + @Override + public ColumnVector eval(ColumnarBatch input) { + return new ExpressionEvalVisitor(input).visit(expression); + } + + /** + * Implementation of {@link ExpressionVisitor} to evaluate expression on a + * {@link ColumnarBatch}. + */ + private static class ExpressionEvalVisitor extends ExpressionVisitor<ColumnVector> { + private final ColumnarBatch input; + + ExpressionEvalVisitor(ColumnarBatch input) { + this.input = input; + } + + /* + | Operand 1 | Operand 2 | `AND` | `OR` | + |-----------|-----------|------------|------------| + | True | True | True | True | + | True | False | False | True | + | True | NULL | NULL | True | + | False | True | False | True | + | False | False | False | False | + | False | NULL | False | NULL | + | NULL | True | NULL | True | + | NULL | False | False | NULL | + | NULL | NULL | NULL | NULL | + */ + @Override + ColumnVector visitAnd(And and) { + PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(and); + ColumnVector left = argResults.leftResult; + ColumnVector right = argResults.rightResult; + int numRows = argResults.rowCount; + boolean[] result = new boolean[numRows]; + boolean[] nullability = new boolean[numRows]; + for (int rowId = 0; rowId < numRows; rowId++) { + boolean leftIsTrue = !left.isNullAt(rowId) && left.getBoolean(rowId); + boolean rightIsTrue = !right.isNullAt(rowId) && right.getBoolean(rowId); + boolean leftIsFalse = !left.isNullAt(rowId) && !left.getBoolean(rowId); + boolean rightIsFalse = !right.isNullAt(rowId) && !right.getBoolean(rowId); + + if (leftIsFalse || rightIsFalse) { + nullability[rowId] = false; + result[rowId] = false; + } else if (leftIsTrue && rightIsTrue) { + nullability[rowId] = false; + result[rowId] = true; + } else { + nullability[rowId] = true; + // result[rowId] is undefined when nullability[rowId] = true + } + } + return new DefaultBooleanVector(numRows, Optional.of(nullability), result); + } + + @Override + ColumnVector visitOr(Or or) { + PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(or); + ColumnVector left = argResults.leftResult; + ColumnVector right = argResults.rightResult; + int numRows = argResults.rowCount; + boolean[] result = new boolean[numRows]; + boolean[] nullability = new boolean[numRows]; + for (int rowId = 0; rowId < numRows; rowId++) { + boolean leftIsTrue = !left.isNullAt(rowId) && left.getBoolean(rowId); + boolean rightIsTrue = !right.isNullAt(rowId) && right.getBoolean(rowId); + boolean leftIsFalse = !left.isNullAt(rowId) && !left.getBoolean(rowId); + boolean rightIsFalse = !right.isNullAt(rowId) && !right.getBoolean(rowId); + + if (leftIsTrue || rightIsTrue) { + nullability[rowId] = false; + result[rowId] = true; + } else if (leftIsFalse && rightIsFalse) { + nullability[rowId] = false; + result[rowId] = false; + } else { + nullability[rowId] = true; + // result[rowId] is undefined when nullability[rowId] = true + } + } + return new DefaultBooleanVector(numRows, Optional.of(nullability), result); + } + + @Override + ColumnVector visitAlwaysTrue(AlwaysTrue alwaysTrue) { + return new DefaultConstantVector(BooleanType.BOOLEAN, input.getSize(), true); + } + + @Override + ColumnVector visitAlwaysFalse(AlwaysFalse alwaysFalse) { + return new DefaultConstantVector(BooleanType.BOOLEAN, input.getSize(), false); + } + + @Override + ColumnVector visitComparator(Predicate predicate) { + PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(predicate); + + int numRows = argResults.rowCount; + boolean[] result = new boolean[numRows]; + boolean[] nullability = evalNullability(argResults.leftResult, argResults.rightResult); + int[] compareResult = compare(argResults.leftResult, argResults.rightResult); + switch (predicate.getName()) { + case "=": + for (int rowId = 0; rowId < numRows; rowId++) { + result[rowId] = compareResult[rowId] == 0; + } + break; + case ">": + for (int rowId = 0; rowId < numRows; rowId++) { + result[rowId] = compareResult[rowId] > 0; + } + break; + case ">=": + for (int rowId = 0; rowId < numRows; rowId++) { + result[rowId] = compareResult[rowId] >= 0; + } + break; + case "<": + for (int rowId = 0; rowId < numRows; rowId++) { + result[rowId] = compareResult[rowId] < 0; + } + break; + case "<=": + for (int rowId = 0; rowId < numRows; rowId++) { + result[rowId] = compareResult[rowId] <= 0; + } + break; + default: + // We should never reach this based on the ExpressionVisitor + throw new IllegalStateException( + String.format("%s is not a recognized comparator", predicate.getName())); + } + + return new DefaultBooleanVector(numRows, Optional.of(nullability), result); + } + + @Override + ColumnVector visitLiteral(Literal literal) { + DataType dataType = literal.getDataType(); + if (dataType instanceof BooleanType || dataType instanceof ByteType || dataType instanceof ShortType + || dataType instanceof IntegerType || dataType instanceof LongType || dataType instanceof FloatType + || dataType instanceof DoubleType || dataType instanceof StringType + || dataType instanceof BinaryType || dataType instanceof DecimalType || dataType instanceof DateType + || dataType instanceof TimestampType || dataType instanceof TimestampNTZType) { + return new DefaultConstantVector(dataType, input.getSize(), literal.getValue()); + } + + throw new UnsupportedOperationException("unsupported expression encountered: " + literal); + } + + @Override + ColumnVector visitColumn(Column column) { + String[] names = column.getNames(); + DataType currentType = input.getSchema(); + ColumnVector columnVector = null; + for (int level = 0; level < names.length; level++) { + assertColumnExists(currentType instanceof StructType, input.getSchema(), column); + StructType structSchema = ((StructType) currentType); + int ordinal = structSchema.indexOf(names[level]); + assertColumnExists(ordinal != -1, input.getSchema(), column); + currentType = structSchema.at(ordinal).getDataType(); + + if (level == 0) { + columnVector = input.getColumnVector(ordinal); + } else { + columnVector = columnVector.getChild(ordinal); + } + } + assertColumnExists(columnVector != null, input.getSchema(), column); + return columnVector; + } + + @Override + ColumnVector visitCast(ImplicitCastExpression cast) { + ColumnVector inputResult = visit(cast.getInput()); + return cast.eval(inputResult); + } + + @Override + ColumnVector visitPartitionValue(PartitionValueExpression partitionValue) { + ColumnVector input = visit(partitionValue.getInput()); + return PartitionValueEvaluator.eval(input, partitionValue.getDataType()); + } + + @Override + ColumnVector visitElementAt(ScalarExpression elementAt) { + ColumnVector map = visit(childAt(elementAt, 0)); + ColumnVector lookupKey = visit(childAt(elementAt, 1)); + return ElementAtEvaluator.eval(map, lookupKey); + } + + @Override + ColumnVector visitNot(Predicate predicate) { + ColumnVector childResult = visit(childAt(predicate, 0)); + return booleanWrapperVector(childResult, rowId -> !childResult.getBoolean(rowId), + rowId -> childResult.isNullAt(rowId)); + } + + @Override + ColumnVector visitIsNotNull(Predicate predicate) { + ColumnVector childResult = visit(childAt(predicate, 0)); + return booleanWrapperVector(childResult, rowId -> !childResult.isNullAt(rowId), rowId -> false); + } + + @Override + ColumnVector visitIsNull(Predicate predicate) { + ColumnVector childResult = visit(getUnaryChild(predicate)); + return booleanWrapperVector(childResult, rowId -> childResult.isNullAt(rowId), rowId -> false); + } + + @Override + ColumnVector visitCoalesce(ScalarExpression coalesce) { + List<ColumnVector> childResults = + coalesce.getChildren().stream().map(this::visit).collect(Collectors.toList()); + return DefaultExpressionUtils.combinationVector(childResults, rowId -> { + for (int idx = 0; idx < childResults.size(); idx++) { + if (!childResults.get(idx).isNullAt(rowId)) { + return idx; + } + } + return 0; // If all are null then any idx suffices + }); + } + + /** + * Utility method to evaluate inputs to the binary input expression. Also validates the + * evaluated expression result {@link ColumnVector}s are of the same size. + * + * @param predicate + * @return Triplet of (result vector size, left operand result, left operand result) + */ + private PredicateChildrenEvalResult evalBinaryExpressionChildren(Predicate predicate) { + ColumnVector left = visit(getLeft(predicate)); + ColumnVector right = visit(getRight(predicate)); + checkArgument(left.getSize() == right.getSize(), + "Left and right operand returned different results: left=%d, right=d", left.getSize(), + right.getSize()); + return new PredicateChildrenEvalResult(left.getSize(), left, right); + } + } + + /** + * Encapsulates children expression result of binary input predicate + */ + private static class PredicateChildrenEvalResult { + public final int rowCount; + public final ColumnVector leftResult; + public final ColumnVector rightResult; + + PredicateChildrenEvalResult(int rowCount, ColumnVector leftResult, ColumnVector rightResult) { + this.rowCount = rowCount; + this.leftResult = leftResult; + this.rightResult = rightResult; + } + } + + private static void assertColumnExists(boolean condition, StructType schema, Column column) { + if (!condition) { + throw new IllegalArgumentException( + String.format("%s doesn't exist in input data schema: %s", column, schema)); + } + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java new file mode 100644 index 0000000000..d9f5251420 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import io.delta.kernel.defaults.engine.DefaultExpressionHandler; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.expressions.ExpressionEvaluator; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.expressions.PredicateEvaluator; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.StructType; + +public class DeltaExpressionHandler extends DefaultExpressionHandler { + + @Override + public ExpressionEvaluator getEvaluator(StructType inputSchema, Expression expression, DataType outputType) { + return new DeltaExpressionEvaluator(inputSchema, expression, outputType); + } + + @Override + public PredicateEvaluator getPredicateEvaluator(StructType inputSchema, Predicate predicate) { + return new DeltaPredicateEvaluator(inputSchema, predicate); + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java index 121a76b2e8..44dfdf006c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java @@ -22,8 +22,14 @@ import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.IOException; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.asterix.common.exceptions.ErrorCode; @@ -39,15 +45,40 @@ import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.hdfs.dataflow.ConfFactory; import io.delta.kernel.Scan; +import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; -import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.engine.ExpressionHandler; +import io.delta.kernel.expressions.ExpressionEvaluator; +import io.delta.kernel.expressions.Literal; import io.delta.kernel.expressions.Predicate; import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.actions.DeletionVectorDescriptor; import io.delta.kernel.internal.data.ScanStateRow; +import io.delta.kernel.internal.data.SelectionColumnVector; +import io.delta.kernel.internal.deletionvectors.DeletionVectorUtils; +import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray; +import io.delta.kernel.internal.util.ColumnMapping; +import io.delta.kernel.internal.util.InternalUtils; +import io.delta.kernel.internal.util.Tuple2; +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DecimalType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampNTZType; +import io.delta.kernel.types.TimestampType; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; @@ -74,7 +105,7 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config, String filterExpressionStr) throws HyracksDataException { JobConf conf = config.getConf(); - this.engine = DefaultEngine.create(conf); + this.engine = DeltaEngine.create(conf); this.scanFiles = new ArrayList<>(); for (String scanFile : serScanFiles) { this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile)); @@ -94,7 +125,7 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { try { this.physicalDataIter = engine.getParquetHandler() .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, filterPredicate); - this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter); + this.dataIter = transformPhysicalData(engine, scanState, scanFile, physicalDataIter); if (dataIter.hasNext()) { rows = dataIter.next().getRows(); } @@ -165,4 +196,182 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { public boolean handleException(Throwable th) { return false; } + + static CloseableIterator<FilteredColumnarBatch> transformPhysicalData(Engine engine, Row scanState, Row scanFile, + CloseableIterator<ColumnarBatch> physicalDataIter) throws IOException { + return new CloseableIterator<FilteredColumnarBatch>() { + boolean inited = false; + + // initialized as part of init() + StructType physicalReadSchema = null; + StructType logicalReadSchema = null; + String tablePath = null; + + RoaringBitmapArray currBitmap = null; + DeletionVectorDescriptor currDV = null; + + private void initIfRequired() { + if (inited) { + return; + } + physicalReadSchema = ScanStateRow.getPhysicalSchema(engine, scanState); + logicalReadSchema = ScanStateRow.getLogicalSchema(engine, scanState); + + tablePath = ScanStateRow.getTableRoot(scanState); + inited = true; + } + + @Override + public void close() throws IOException { + physicalDataIter.close(); + } + + @Override + public boolean hasNext() { + initIfRequired(); + return physicalDataIter.hasNext(); + } + + @Override + public FilteredColumnarBatch next() { + initIfRequired(); + ColumnarBatch nextDataBatch = physicalDataIter.next(); + + DeletionVectorDescriptor dv = InternalScanFileUtils.getDeletionVectorDescriptorFromRow(scanFile); + + int rowIndexOrdinal = nextDataBatch.getSchema().indexOf(StructField.METADATA_ROW_INDEX_COLUMN_NAME); + + // Get the selectionVector if DV is present + Optional<ColumnVector> selectionVector; + if (dv == null) { + selectionVector = Optional.empty(); + } else { + if (rowIndexOrdinal == -1) { + throw new IllegalArgumentException( + "Row index column is not " + "present in the data read from the Parquet file."); + } + if (!dv.equals(currDV)) { + Tuple2<DeletionVectorDescriptor, RoaringBitmapArray> dvInfo = + DeletionVectorUtils.loadNewDvAndBitmap(engine, tablePath, dv); + this.currDV = dvInfo._1; + this.currBitmap = dvInfo._2; + } + ColumnVector rowIndexVector = nextDataBatch.getColumnVector(rowIndexOrdinal); + selectionVector = Optional.of(new SelectionColumnVector(currBitmap, rowIndexVector)); + } + if (rowIndexOrdinal != -1) { + nextDataBatch = nextDataBatch.withDeletedColumnAt(rowIndexOrdinal); + } + + // Add partition columns + nextDataBatch = withPartitionColumns(engine.getExpressionHandler(), nextDataBatch, + InternalScanFileUtils.getPartitionValues(scanFile), physicalReadSchema); + + // Change back to logical schema + String columnMappingMode = ScanStateRow.getColumnMappingMode(scanState); + switch (columnMappingMode) { + case ColumnMapping.COLUMN_MAPPING_MODE_NAME: + case ColumnMapping.COLUMN_MAPPING_MODE_ID: + nextDataBatch = nextDataBatch.withNewSchema(logicalReadSchema); + break; + case ColumnMapping.COLUMN_MAPPING_MODE_NONE: + break; + default: + throw new UnsupportedOperationException( + "Column mapping mode is not yet supported: " + columnMappingMode); + } + + return new FilteredColumnarBatch(nextDataBatch, selectionVector); + } + }; + } + + public static ColumnarBatch withPartitionColumns(ExpressionHandler expressionHandler, ColumnarBatch dataBatch, + Map<String, String> partitionValues, StructType schemaWithPartitionCols) { + if (partitionValues == null || partitionValues.size() == 0) { + // no partition column vectors to attach to. + return dataBatch; + } + + for (int colIdx = 0; colIdx < schemaWithPartitionCols.length(); colIdx++) { + StructField structField = schemaWithPartitionCols.at(colIdx); + + if (partitionValues.containsKey(structField.getName())) { + // Create a partition vector + + ExpressionEvaluator evaluator = expressionHandler.getEvaluator(dataBatch.getSchema(), + literalForPartitionValue(structField.getDataType(), partitionValues.get(structField.getName())), + structField.getDataType()); + + ColumnVector partitionVector = evaluator.eval(dataBatch); + dataBatch = dataBatch.withNewColumn(colIdx, structField, partitionVector); + } + } + + return dataBatch; + } + + protected static Literal literalForPartitionValue(DataType dataType, String partitionValue) { + if (partitionValue == null) { + return Literal.ofNull(dataType); + } + + if (dataType instanceof BooleanType) { + return Literal.ofBoolean(Boolean.parseBoolean(partitionValue)); + } + if (dataType instanceof ByteType) { + return Literal.ofByte(Byte.parseByte(partitionValue)); + } + if (dataType instanceof ShortType) { + return Literal.ofShort(Short.parseShort(partitionValue)); + } + if (dataType instanceof IntegerType) { + return Literal.ofInt(Integer.parseInt(partitionValue)); + } + if (dataType instanceof LongType) { + return Literal.ofLong(Long.parseLong(partitionValue)); + } + if (dataType instanceof FloatType) { + return Literal.ofFloat(Float.parseFloat(partitionValue)); + } + if (dataType instanceof DoubleType) { + return Literal.ofDouble(Double.parseDouble(partitionValue)); + } + if (dataType instanceof StringType) { + return Literal.ofString(partitionValue); + } + if (dataType instanceof BinaryType) { + return Literal.ofBinary(partitionValue.getBytes()); + } + if (dataType instanceof DateType) { + return Literal.ofDate(InternalUtils.daysSinceEpoch(Date.valueOf(partitionValue))); + } + if (dataType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) dataType; + return Literal.ofDecimal(new BigDecimal(partitionValue), decimalType.getPrecision(), + decimalType.getScale()); + } + if (dataType instanceof TimestampType) { + try { + Timestamp timestamp = Timestamp.valueOf(partitionValue); + return Literal.ofTimestamp(InternalUtils.microsSinceEpoch(timestamp)); + } catch (IllegalArgumentException e) { + Instant instant = Instant.parse(partitionValue); + return Literal.ofTimestamp(ChronoUnit.MICROS.between(Instant.EPOCH, instant)); + } + } + if (dataType instanceof TimestampNTZType) { + // Both the timestamp and timestamp_ntz have no timezone info, so they are interpreted + // in local time zone. + try { + Timestamp timestamp = Timestamp.valueOf(partitionValue); + return Literal.ofTimestampNtz(InternalUtils.microsSinceEpoch(timestamp)); + } catch (IllegalArgumentException e) { + Instant instant = Instant.parse(partitionValue); + return Literal.ofTimestampNtz(ChronoUnit.MICROS.between(Instant.EPOCH, instant)); + } + } + + throw new UnsupportedOperationException("Unsupported partition column: " + dataType); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java new file mode 100644 index 0000000000..494d3bc5bf --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import java.util.Optional; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.defaults.internal.data.vector.DefaultConstantVector; +import io.delta.kernel.expressions.And; +import io.delta.kernel.expressions.Column; +import io.delta.kernel.expressions.ExpressionEvaluator; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.expressions.PredicateEvaluator; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; + +public class DeltaPredicateEvaluator implements PredicateEvaluator { + + private final ExpressionEvaluator expressionEvaluator; + private static final String EXISTING_SEL_VECTOR_COL_NAME = "____existing_selection_vector_value____"; + private static final StructField EXISTING_SEL_VECTOR_FIELD = + new StructField(EXISTING_SEL_VECTOR_COL_NAME, BooleanType.BOOLEAN, false); + + public DeltaPredicateEvaluator(StructType inputSchema, Predicate predicate) { + Predicate rewrittenPredicate = new And( + new Predicate("=", new Column(EXISTING_SEL_VECTOR_COL_NAME), Literal.ofBoolean(true)), predicate); + StructType rewrittenInputSchema = inputSchema.add(EXISTING_SEL_VECTOR_FIELD); + this.expressionEvaluator = + new DeltaExpressionEvaluator(rewrittenInputSchema, rewrittenPredicate, BooleanType.BOOLEAN); + } + + @Override + public ColumnVector eval(ColumnarBatch inputData, Optional<ColumnVector> existingSelectionVector) { + try { + ColumnVector newVector = existingSelectionVector + .orElse(new DefaultConstantVector(BooleanType.BOOLEAN, inputData.getSize(), true)); + ColumnarBatch withExistingSelVector = + inputData.withNewColumn(inputData.getSchema().length(), EXISTING_SEL_VECTOR_FIELD, newVector); + + return expressionEvaluator.eval(withExistingSelVector); + } finally { + // release the existing selection vector. + Utils.closeCloseables(existingSelectionVector.orElse(null)); + } + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java index b76dd4dbe4..10a527e641 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java @@ -60,7 +60,6 @@ import io.delta.kernel.Scan; import io.delta.kernel.Snapshot; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; -import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.KernelEngineException; import io.delta.kernel.exceptions.KernelException; @@ -104,7 +103,7 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> configureJobConf(appCtx, conf, configuration); confFactory = new ConfFactory(conf); String tableMetadataPath = getTablePath(configuration); - Engine engine = DefaultEngine.create(conf); + Engine engine = DeltaEngine.create(conf); io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); Snapshot snapshot; try { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java new file mode 100644 index 0000000000..388a287bd2 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import static io.delta.kernel.defaults.internal.DefaultEngineErrors.unsupportedExpressionException; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.lang.String.format; +import static org.apache.asterix.external.input.record.reader.aws.delta.ImplicitCastExpression.canCastTo; + +import java.util.Arrays; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.data.MapValue; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.expressions.ScalarExpression; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.StringType; + +/** + * Utility methods to evaluate {@code element_at} expression. + */ +class ElementAtEvaluator { + private ElementAtEvaluator() { + } + + /** + * Validate and transform the {@code element_at} expression with given validated and + * transformed inputs. + */ + static ScalarExpression validateAndTransform(ScalarExpression elementAt, Expression mapInput, DataType mapInputType, + Expression lookupKey, DataType lookupKeyType) { + + MapType asMapType = validateSupportedMapType(elementAt, mapInputType); + DataType keyTypeFromMapInput = asMapType.getKeyType(); + + if (!keyTypeFromMapInput.equivalent(lookupKeyType)) { + if (canCastTo(lookupKeyType, keyTypeFromMapInput)) { + lookupKey = new ImplicitCastExpression(lookupKey, keyTypeFromMapInput); + } else { + String reason = format("lookup key type (%s) is different from the map key type (%s)", lookupKeyType, + asMapType.getKeyType()); + throw unsupportedExpressionException(elementAt, reason); + } + } + return new ScalarExpression(elementAt.getName(), Arrays.asList(mapInput, lookupKey)); + } + + /** + * Utility method to evaluate the {@code element_at} on given map and key vectors. + * @param map {@link ColumnVector} of {@code map(string, string)} type. + * @param lookupKey {@link ColumnVector} of {@code string} type. + * @return result {@link ColumnVector} containing the lookup values. + */ + static ColumnVector eval(ColumnVector map, ColumnVector lookupKey) { + return new ColumnVector() { + // Store the last lookup value to avoid multiple looks up for same row id. + // The general pattern is call `isNullAt(rowId)` followed by `getString`. + // So the cache of one value is enough. + private int lastLookupRowId = -1; + private String lastLookupValue = null; + + @Override + public DataType getDataType() { + return ((MapType) map.getDataType()).getValueType(); + } + + @Override + public int getSize() { + return map.getSize(); + } + + @Override + public void close() { + Utils.closeCloseables(map, lookupKey); + } + + @Override + public boolean isNullAt(int rowId) { + if (rowId == lastLookupRowId) { + return lastLookupValue == null; + } + return map.isNullAt(rowId) || lookupValue(rowId) == null; + } + + @Override + public String getString(int rowId) { + lookupValue(rowId); + return lastLookupValue == null ? null : lastLookupValue; + } + + private Object lookupValue(int rowId) { + if (rowId == lastLookupRowId) { + return lastLookupValue; + } + lastLookupRowId = rowId; + String keyValue = lookupKey.getString(rowId); + lastLookupValue = findValueForKey(map.getMap(rowId), keyValue); + return lastLookupValue; + } + + /** + * Given a {@link MapValue} and string {@code key} find the corresponding value. + * Returns null if the key is not in the map. + * @param mapValue String->String map to search + * @param key the key to look up the value for; may be null + */ + private String findValueForKey(MapValue mapValue, String key) { + ColumnVector keyVector = mapValue.getKeys(); + for (int i = 0; i < mapValue.getSize(); i++) { + if ((keyVector.isNullAt(i) && key == null) + || (!keyVector.isNullAt(i) && keyVector.getString(i).equals(key))) { + return mapValue.getValues().isNullAt(i) ? null : mapValue.getValues().getString(i); + } + } + // If the key is not in the map return null + return null; + } + }; + } + + private static MapType validateSupportedMapType(Expression elementAt, DataType mapInputType) { + checkArgument(mapInputType instanceof MapType, "expected a map type input as first argument: " + elementAt); + MapType asMapType = (MapType) mapInputType; + // For now we only need to support lookup in columns of type `map(string -> string)`. + // Additional type support may be added later + if (asMapType.getKeyType().equivalent(StringType.STRING) + && asMapType.getValueType().equivalent(StringType.STRING)) { + return asMapType; + } + throw new UnsupportedOperationException( + format("%s: Supported only on type map(string, string) input data", elementAt)); + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java new file mode 100644 index 0000000000..7e2a65be6d --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import static io.delta.kernel.expressions.AlwaysFalse.ALWAYS_FALSE; +import static io.delta.kernel.expressions.AlwaysTrue.ALWAYS_TRUE; +import static java.util.stream.Collectors.joining; + +import java.util.List; +import java.util.Locale; + +import io.delta.kernel.expressions.AlwaysFalse; +import io.delta.kernel.expressions.AlwaysTrue; +import io.delta.kernel.expressions.And; +import io.delta.kernel.expressions.Column; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.expressions.Literal; +import io.delta.kernel.expressions.Or; +import io.delta.kernel.expressions.PartitionValueExpression; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.expressions.ScalarExpression; + +/** + * Interface to allow visiting an expression tree and implementing handling for each + * specific expression type. + * + * @param <R> Return type of result of visit expression methods. + */ +abstract class ExpressionVisitor<R> { + + abstract R visitAnd(And and); + + abstract R visitOr(Or or); + + abstract R visitAlwaysTrue(AlwaysTrue alwaysTrue); + + abstract R visitAlwaysFalse(AlwaysFalse alwaysFalse); + + abstract R visitComparator(Predicate predicate); + + abstract R visitLiteral(Literal literal); + + abstract R visitColumn(Column column); + + abstract R visitCast(ImplicitCastExpression cast); + + abstract R visitPartitionValue(PartitionValueExpression partitionValue); + + abstract R visitElementAt(ScalarExpression elementAt); + + abstract R visitNot(Predicate predicate); + + abstract R visitIsNotNull(Predicate predicate); + + abstract R visitIsNull(Predicate predicate); + + abstract R visitCoalesce(ScalarExpression ifNull); + + final R visit(Expression expression) { + if (expression instanceof PartitionValueExpression) { + return visitPartitionValue((PartitionValueExpression) expression); + } else if (expression instanceof ScalarExpression) { + return visitScalarExpression((ScalarExpression) expression); + } else if (expression instanceof Literal) { + return visitLiteral((Literal) expression); + } else if (expression instanceof Column) { + return visitColumn((Column) expression); + } else if (expression instanceof ImplicitCastExpression) { + return visitCast((ImplicitCastExpression) expression); + } + + throw new UnsupportedOperationException(String.format("Expression %s is not supported.", expression)); + } + + private R visitScalarExpression(ScalarExpression expression) { + List<Expression> children = expression.getChildren(); + String name = expression.getName().toUpperCase(Locale.ENGLISH); + switch (name) { + case "ALWAYS_TRUE": + return visitAlwaysTrue(ALWAYS_TRUE); + case "ALWAYS_FALSE": + return visitAlwaysFalse(ALWAYS_FALSE); + case "AND": + return visitAnd(new And(elemAsPredicate(children, 0), elemAsPredicate(children, 1))); + case "OR": + return visitOr(new Or(elemAsPredicate(children, 0), elemAsPredicate(children, 1))); + case "=": + case "<": + case "<=": + case ">": + case ">=": + return visitComparator(new Predicate(name, children)); + case "ELEMENT_AT": + return visitElementAt(expression); + case "NOT": + return visitNot(new Predicate(name, children)); + case "IS_NOT_NULL": + return visitIsNotNull(new Predicate(name, children)); + case "IS_NULL": + return visitIsNull(new Predicate(name, children)); + case "COALESCE": + return visitCoalesce(expression); + default: + throw new UnsupportedOperationException( + String.format("Scalar expression `%s` is not supported.", name)); + } + } + + private static Predicate elemAsPredicate(List<Expression> expressions, int index) { + if (expressions.size() <= index) { + throw new RuntimeException(String.format("Trying to access invalid entry (%d) in list %s", index, + expressions.stream().map(Object::toString).collect(joining(",")))); + } + Expression elemExpression = expressions.get(index); + if (!(elemExpression instanceof Predicate)) { + throw new RuntimeException("Expected a predicate, but got " + elemExpression); + } + return (Predicate) expressions.get(index); + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java new file mode 100644 index 0000000000..febd4d3160 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import static java.lang.String.format; +import static java.util.Collections.unmodifiableMap; +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.expressions.Expression; +import io.delta.kernel.types.DataType; + +/** + * An implicit cast expression to convert the input type to another given type. Here is the valid + * list of casts + * <p> + * <ul> + * <li>{@code byte} to {@code short, int, long, float, double}</li> + * <li>{@code short} to {@code int, long, float, double}</li> + * <li>{@code int} to {@code long, float, double}</li> + * <li>{@code long} to {@code float, double}</li> + * <li>{@code float} to {@code double}</li> + * </ul> + * + * <p> + * The above list is not exhaustive. Based on the need, we can add more casts. + * <p> + */ +final class ImplicitCastExpression implements Expression { + private final Expression input; + private final DataType outputType; + + /** + * Create a cast around the given input expression to specified output data + * type. It is the responsibility of the caller to validate the input expression can be cast + * to the new type using {@link #canCastTo(DataType, DataType)} + */ + ImplicitCastExpression(Expression input, DataType outputType) { + this.input = requireNonNull(input, "input is null"); + this.outputType = requireNonNull(outputType, "outputType is null"); + } + + public Expression getInput() { + return input; + } + + public DataType getOutputType() { + return outputType; + } + + @Override + public List<Expression> getChildren() { + return Collections.singletonList(input); + } + + /** + * Evaluate the given column expression on the input {@link ColumnVector}. + * + * @param input {@link ColumnVector} data of the input to the cast expression. + * @return {@link ColumnVector} result applying target type casting on every element in the + * input {@link ColumnVector}. + */ + ColumnVector eval(ColumnVector input) { + String fromTypeStr = input.getDataType().toString(); + switch (fromTypeStr) { + case "byte": + return new ByteUpConverter(outputType, input); + case "short": + return new ShortUpConverter(outputType, input); + case "integer": + return new IntUpConverter(outputType, input); + case "long": + return new LongUpConverter(outputType, input); + case "float": + return new FloatUpConverter(outputType, input); + default: + throw new UnsupportedOperationException(format("Cast from %s is not supported", fromTypeStr)); + } + } + + /** + * Map containing for each type what are the target cast types can be. + */ + private static final Map<String, List<String>> UP_CASTABLE_TYPE_TABLE; + + static { + Map<String, List<String>> map = new HashMap<>(); + map.put("byte", Arrays.asList("short", "integer", "long", "float", "double")); + map.put("short", Arrays.asList("integer", "long", "float", "double")); + map.put("integer", Arrays.asList("long", "float", "double")); + map.put("long", Arrays.asList("float", "double")); + map.put("float", Arrays.asList("double")); + UP_CASTABLE_TYPE_TABLE = unmodifiableMap(map); + } + + /** + * Utility method which returns whether the given {@code from} type can be cast to {@code to} + * type. + */ + static boolean canCastTo(DataType from, DataType to) { + // TODO: The type name should be a first class method on `DataType` instead of getting it + // using the `toString`. + String fromStr = from.toString(); + String toStr = to.toString(); + return UP_CASTABLE_TYPE_TABLE.containsKey(fromStr) && UP_CASTABLE_TYPE_TABLE.get(fromStr).contains(toStr); + } + + /** + * Base class for up casting {@link ColumnVector} data. + */ + private abstract static class UpConverter implements ColumnVector { + protected final DataType targetType; + protected final ColumnVector inputVector; + + UpConverter(DataType targetType, ColumnVector inputVector) { + this.targetType = targetType; + this.inputVector = inputVector; + } + + @Override + public DataType getDataType() { + return targetType; + } + + @Override + public boolean isNullAt(int rowId) { + return inputVector.isNullAt(rowId); + } + + @Override + public int getSize() { + return inputVector.getSize(); + } + + @Override + public void close() { + inputVector.close(); + } + } + + private static class ByteUpConverter extends UpConverter { + ByteUpConverter(DataType targetType, ColumnVector inputVector) { + super(targetType, inputVector); + } + + @Override + public short getShort(int rowId) { + return inputVector.getByte(rowId); + } + + @Override + public int getInt(int rowId) { + return inputVector.getByte(rowId); + } + + @Override + public long getLong(int rowId) { + return inputVector.getByte(rowId); + } + + @Override + public float getFloat(int rowId) { + return inputVector.getByte(rowId); + } + + @Override + public double getDouble(int rowId) { + return inputVector.getByte(rowId); + } + } + + private static class ShortUpConverter extends UpConverter { + ShortUpConverter(DataType targetType, ColumnVector inputVector) { + super(targetType, inputVector); + } + + @Override + public int getInt(int rowId) { + return inputVector.getShort(rowId); + } + + @Override + public long getLong(int rowId) { + return inputVector.getShort(rowId); + } + + @Override + public float getFloat(int rowId) { + return inputVector.getShort(rowId); + } + + @Override + public double getDouble(int rowId) { + return inputVector.getShort(rowId); + } + } + + private static class IntUpConverter extends UpConverter { + IntUpConverter(DataType targetType, ColumnVector inputVector) { + super(targetType, inputVector); + } + + @Override + public long getLong(int rowId) { + return inputVector.getInt(rowId); + } + + @Override + public float getFloat(int rowId) { + return inputVector.getInt(rowId); + } + + @Override + public double getDouble(int rowId) { + return inputVector.getInt(rowId); + } + } + + private static class LongUpConverter extends UpConverter { + LongUpConverter(DataType targetType, ColumnVector inputVector) { + super(targetType, inputVector); + } + + @Override + public float getFloat(int rowId) { + return inputVector.getLong(rowId); + } + + @Override + public double getDouble(int rowId) { + return inputVector.getLong(rowId); + } + } + + private static class FloatUpConverter extends UpConverter { + FloatUpConverter(DataType targetType, ColumnVector inputVector) { + super(targetType, inputVector); + } + + @Override + public double getDouble(int rowId) { + return inputVector.getFloat(rowId); + } + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java new file mode 100644 index 0000000000..c24b225e7a --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +import io.delta.kernel.data.ColumnVector; +import io.delta.kernel.internal.util.InternalUtils; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.TimestampNTZType; +import io.delta.kernel.types.TimestampType; + +/** + * Utility methods to evaluate {@code partition_value} expression + */ +class PartitionValueEvaluator { + /** + * Evaluate the {@code partition_value} expression for given input column vector and generate + * a column vector with decoded values according to the given partition type. + */ + static ColumnVector eval(ColumnVector input, DataType partitionType) { + return new ColumnVector() { + @Override + public DataType getDataType() { + return partitionType; + } + + @Override + public int getSize() { + return input.getSize(); + } + + @Override + public void close() { + input.close(); + } + + @Override + public boolean isNullAt(int rowId) { + return input.isNullAt(rowId); + } + + @Override + public boolean getBoolean(int rowId) { + return Boolean.parseBoolean(input.getString(rowId)); + } + + @Override + public byte getByte(int rowId) { + return Byte.parseByte(input.getString(rowId)); + } + + @Override + public short getShort(int rowId) { + return Short.parseShort(input.getString(rowId)); + } + + @Override + public int getInt(int rowId) { + if (partitionType.equivalent(IntegerType.INTEGER)) { + return Integer.parseInt(input.getString(rowId)); + } else if (partitionType.equivalent(DateType.DATE)) { + return InternalUtils.daysSinceEpoch(Date.valueOf(input.getString(rowId))); + } + throw new UnsupportedOperationException("Invalid value request for data type"); + } + + @Override + public long getLong(int rowId) { + if (partitionType.equivalent(LongType.LONG)) { + return Long.parseLong(input.getString(rowId)); + } else if (partitionType.equivalent(TimestampType.TIMESTAMP) + || partitionType.equivalent(TimestampNTZType.TIMESTAMP_NTZ)) { + // Both the timestamp and timestamp_ntz have no timezone info, + // so they are interpreted in local time zone. + try { + Timestamp timestamp = Timestamp.valueOf(input.getString(rowId)); + return InternalUtils.microsSinceEpoch(timestamp); + } catch (IllegalArgumentException e) { + Instant instant = Instant.parse(input.getString(rowId)); + return ChronoUnit.MICROS.between(Instant.EPOCH, instant); + } + } + throw new UnsupportedOperationException("Invalid value request for data type"); + } + + @Override + public float getFloat(int rowId) { + return Float.parseFloat(input.getString(rowId)); + } + + @Override + public double getDouble(int rowId) { + return Double.parseDouble(input.getString(rowId)); + } + + @Override + public byte[] getBinary(int rowId) { + return input.isNullAt(rowId) ? null : input.getString(rowId).getBytes(); + } + + @Override + public String getString(int rowId) { + return input.getString(rowId); + } + + @Override + public BigDecimal getDecimal(int rowId) { + return input.isNullAt(rowId) ? null : new BigDecimal(input.getString(rowId)); + } + }; + } +}
