This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit bfe4b3a7a2c4091ec9a2daa1de724a9b8d547395 Author: Peeyush Gupta <[email protected]> AuthorDate: Tue Jul 22 12:19:12 2025 -0700 [NO ISSUE][EXT] Upgrage Delta Kernel API to v4.0.0 - user model changes: no - storage format changes: no - interface changes: no Ext-ref: MB-67760 Change-Id: I283206f278ec713a517b7f02016bcb5b321e65fb Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20131 Reviewed-by: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> --- asterixdb/asterix-app/pom.xml | 5 + .../deltalake/DeltaTableGenerator.java | 38 ++- asterixdb/asterix-external-data/pom.xml | 13 +- .../reader/aws/delta/DefaultExpressionUtils.java | 366 --------------------- .../input/record/reader/aws/delta/DeltaEngine.java | 40 --- .../reader/aws/delta/DeltaExpressionEvaluator.java | 339 ------------------- .../reader/aws/delta/DeltaExpressionHandler.java | 40 --- .../reader/aws/delta/DeltaFileRecordReader.java | 215 +----------- .../reader/aws/delta/DeltaPredicateEvaluator.java | 66 ---- .../reader/aws/delta/DeltaReaderFactory.java | 15 +- .../reader/aws/delta/ElementAtEvaluator.java | 151 --------- .../record/reader/aws/delta/ExpressionVisitor.java | 136 -------- .../reader/aws/delta/ImplicitCastExpression.java | 267 --------------- .../reader/aws/delta/PartitionValueEvaluator.java | 136 -------- hyracks-fullstack/pom.xml | 10 + 15 files changed, 59 insertions(+), 1778 deletions(-) diff --git a/asterixdb/asterix-app/pom.xml b/asterixdb/asterix-app/pom.xml index b04c95716f..bd6b9feed8 100644 --- a/asterixdb/asterix-app/pom.xml +++ b/asterixdb/asterix-app/pom.xml @@ -1128,5 +1128,10 @@ <artifactId>converter</artifactId> <version>0.2.15</version> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-runtime</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java index a3b20f8616..a8f0b7b674 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/external_dataset/deltalake/DeltaTableGenerator.java @@ -32,6 +32,7 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericData; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hyracks.api.util.IoUtil; import org.apache.parquet.avro.AvroParquetWriter; @@ -133,8 +134,9 @@ public class DeltaTableGenerator { writer.write(record); } - long size = writer.getDataSize(); + FileSystem fs = FileSystem.getLocal(conf); writer.close(); + long size = fs.getFileStatus(path).getLen(); List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size, System.currentTimeMillis(), true, null, null)); @@ -154,8 +156,8 @@ public class DeltaTableGenerator { for (GenericData.Record record : fileSecondSnapshotRecords) { writer2.write(record); } - long size2 = writer2.getDataSize(); writer2.close(); + long size2 = fs.getFileStatus(path2).getLen(); AddFile addFile = new AddFile("firstFile.parquet", new HashMap<>(), size, System.currentTimeMillis(), true, null, null); RemoveFile removeFile = addFile.remove(); @@ -201,8 +203,9 @@ public class DeltaTableGenerator { writer.write(record); } - long size = writer.getDataSize(); + FileSystem fs = FileSystem.getLocal(conf); writer.close(); + long size = fs.getFileStatus(path).getLen(); List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size, System.currentTimeMillis(), true, null, null)); @@ -224,8 +227,8 @@ public class DeltaTableGenerator { writer2.write(record); } - long size2 = writer2.getDataSize(); writer2.close(); + long size2 = fs.getFileStatus(path2).getLen(); List<Action> actions2 = List.of(new AddFile("secondFile.parquet", new HashMap<>(), size2, System.currentTimeMillis(), true, null, null)); @@ -252,9 +255,9 @@ public class DeltaTableGenerator { for (GenericData.Record record : fileFirstSnapshotRecords) { writer.write(record); } - - long size = writer.getDataSize(); + FileSystem fs = FileSystem.getLocal(conf); writer.close(); + long size = fs.getFileStatus(path).getLen(); List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size, System.currentTimeMillis(), true, null, null)); @@ -325,8 +328,9 @@ public class DeltaTableGenerator { writer.write(record); } - long size = writer.getDataSize(); + FileSystem fs = FileSystem.getLocal(conf); writer.close(); + long size = fs.getFileStatus(path).getLen(); List<Action> actions = List.of(new AddFile("firstFile.parquet", new HashMap<>(), size, System.currentTimeMillis(), true, null, null)); @@ -348,8 +352,8 @@ public class DeltaTableGenerator { writer2.write(record); } - long size2 = writer2.getDataSize(); writer2.close(); + long size2 = fs.getFileStatus(path2).getLen(); List<Action> actions2 = List.of(new AddFile("File" + i + ".parquet", new HashMap<>(), size2, System.currentTimeMillis(), true, null, null)); @@ -426,8 +430,9 @@ public class DeltaTableGenerator { for (GenericData.Record record : fileFirstSnapshotRecords) { writer.write(record); } - long size = writer.getDataSize(); + FileSystem fs = FileSystem.getLocal(conf); writer.close(); + long size = fs.getFileStatus(path).getLen(); Path path2 = new Path(DELTA_PARTITIONED_TABLE, "secondFile.parquet"); ParquetWriter<GenericData.Record> writer2 = @@ -435,8 +440,8 @@ public class DeltaTableGenerator { for (GenericData.Record record : fileSecondSnapshotRecords) { writer2.write(record); } - long size2 = writer2.getDataSize(); writer2.close(); + long size2 = fs.getFileStatus(path2).getLen(); Path path3 = new Path(DELTA_PARTITIONED_TABLE, "thirdFile.parquet"); ParquetWriter<GenericData.Record> writer3 = @@ -444,8 +449,8 @@ public class DeltaTableGenerator { for (GenericData.Record record : fileThirdSnapshotRecords) { writer3.write(record); } - long size3 = writer3.getDataSize(); writer3.close(); + long size3 = fs.getFileStatus(path3).getLen(); Path path4 = new Path(DELTA_PARTITIONED_TABLE, "fourthFile.parquet"); ParquetWriter<GenericData.Record> writer4 = @@ -453,8 +458,8 @@ public class DeltaTableGenerator { for (GenericData.Record record : fileFourthSnapshotRecords) { writer4.write(record); } - long size4 = writer4.getDataSize(); writer4.close(); + long size4 = fs.getFileStatus(path4).getLen(); DeltaLog log = DeltaLog.forTable(conf, DELTA_PARTITIONED_TABLE); OptimisticTransaction txn = log.startTransaction(); @@ -557,8 +562,9 @@ public class DeltaTableGenerator { for (GenericData.Record record : fileFirstSnapshotRecords) { writer.write(record); } - long size = writer.getDataSize(); + FileSystem fs = FileSystem.getLocal(conf); writer.close(); + long size = fs.getFileStatus(path).getLen(); Path path2 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "secondFile.parquet"); ParquetWriter<GenericData.Record> writer2 = @@ -566,8 +572,8 @@ public class DeltaTableGenerator { for (GenericData.Record record : fileSecondSnapshotRecords) { writer2.write(record); } - long size2 = writer2.getDataSize(); writer2.close(); + long size2 = fs.getFileStatus(path2).getLen(); Path path3 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "thirdFile.parquet"); ParquetWriter<GenericData.Record> writer3 = @@ -575,8 +581,8 @@ public class DeltaTableGenerator { for (GenericData.Record record : fileThirdSnapshotRecords) { writer3.write(record); } - long size3 = writer3.getDataSize(); writer3.close(); + long size3 = fs.getFileStatus(path3).getLen(); Path path4 = new Path(DELTA_TIMESTAMP_PARTITIONED_TABLE, "fourthFile.parquet"); ParquetWriter<GenericData.Record> writer4 = @@ -584,8 +590,8 @@ public class DeltaTableGenerator { for (GenericData.Record record : fileFourthSnapshotRecords) { writer4.write(record); } - long size4 = writer4.getDataSize(); writer4.close(); + long size4 = fs.getFileStatus(path4).getLen(); DeltaLog log = DeltaLog.forTable(conf, DELTA_TIMESTAMP_PARTITIONED_TABLE); OptimisticTransaction txn = log.startTransaction(); diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index b189177389..ae894d5f55 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -579,12 +579,12 @@ <dependency> <groupId>io.delta</groupId> <artifactId>delta-kernel-api</artifactId> - <version>3.2.1</version> + <version>4.0.0</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-kernel-defaults</artifactId> - <version>3.2.1</version> + <version>4.0.0</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> @@ -596,6 +596,15 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.datatype</groupId> + <artifactId>jackson-datatype-jdk8</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-runtime</artifactId> + <scope>test</scope> + </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-standalone_2.12</artifactId> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java deleted file mode 100644 index 560360c840..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DefaultExpressionUtils.java +++ /dev/null @@ -1,366 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.aws.delta; - -import static io.delta.kernel.internal.util.Preconditions.checkArgument; - -import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; -import java.util.Comparator; -import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; - -import io.delta.kernel.data.ArrayValue; -import io.delta.kernel.data.ColumnVector; -import io.delta.kernel.data.MapValue; -import io.delta.kernel.expressions.Expression; -import io.delta.kernel.internal.util.Utils; -import io.delta.kernel.types.BinaryType; -import io.delta.kernel.types.BooleanType; -import io.delta.kernel.types.ByteType; -import io.delta.kernel.types.DataType; -import io.delta.kernel.types.DateType; -import io.delta.kernel.types.DecimalType; -import io.delta.kernel.types.DoubleType; -import io.delta.kernel.types.FloatType; -import io.delta.kernel.types.IntegerType; -import io.delta.kernel.types.LongType; -import io.delta.kernel.types.ShortType; -import io.delta.kernel.types.StringType; -import io.delta.kernel.types.TimestampNTZType; -import io.delta.kernel.types.TimestampType; - -/** - * Utility methods used by the default expression evaluator. - */ -class DefaultExpressionUtils { - private DefaultExpressionUtils() { - } - - static final Comparator<String> STRING_COMPARATOR = (leftOp, rightOp) -> { - byte[] leftBytes = leftOp.getBytes(StandardCharsets.UTF_8); - byte[] rightBytes = rightOp.getBytes(StandardCharsets.UTF_8); - int i = 0; - while (i < leftBytes.length && i < rightBytes.length) { - if (leftBytes[i] != rightBytes[i]) { - return Byte.toUnsignedInt(leftBytes[i]) - Byte.toUnsignedInt(rightBytes[i]); - } - i++; - } - return Integer.compare(leftBytes.length, rightBytes.length); - }; - - static final Comparator<byte[]> BINARY_COMPARTOR = (leftOp, rightOp) -> { - int i = 0; - while (i < leftOp.length && i < rightOp.length) { - if (leftOp[i] != rightOp[i]) { - return Byte.toUnsignedInt(leftOp[i]) - Byte.toUnsignedInt(rightOp[i]); - } - i++; - } - return Integer.compare(leftOp.length, rightOp.length); - }; - - /** - * Utility method that calculates the nullability result from given two vectors. Result is - * null if at least one side is a null. - */ - static boolean[] evalNullability(ColumnVector left, ColumnVector right) { - int numRows = left.getSize(); - boolean[] nullability = new boolean[numRows]; - for (int rowId = 0; rowId < numRows; rowId++) { - nullability[rowId] = left.isNullAt(rowId) || right.isNullAt(rowId); - } - return nullability; - } - - /** - * Wraps a child vector as a boolean {@link ColumnVector} with the given value and nullability - * accessors. - */ - static ColumnVector booleanWrapperVector(ColumnVector childVector, Function<Integer, Boolean> valueAccessor, - Function<Integer, Boolean> nullabilityAccessor) { - - return new ColumnVector() { - - @Override - public DataType getDataType() { - return BooleanType.BOOLEAN; - } - - @Override - public int getSize() { - return childVector.getSize(); - } - - @Override - public void close() { - childVector.close(); - } - - @Override - public boolean isNullAt(int rowId) { - return nullabilityAccessor.apply(rowId); - } - - @Override - public boolean getBoolean(int rowId) { - return valueAccessor.apply(rowId); - } - }; - } - - /** - * Utility method to compare the left and right according to the natural ordering - * and return an integer array where each row contains the comparison result (-1, 0, 1) for - * corresponding rows in the input vectors compared. - * <p> - * Only primitive data types are supported. - */ - static int[] compare(ColumnVector left, ColumnVector right) { - checkArgument(left.getSize() == right.getSize(), "Left and right operand have different vector sizes."); - DataType dataType = left.getDataType(); - - int numRows = left.getSize(); - int[] result = new int[numRows]; - if (dataType instanceof BooleanType) { - compareBoolean(left, right, result); - } else if (dataType instanceof ByteType) { - compareByte(left, right, result); - } else if (dataType instanceof ShortType) { - compareShort(left, right, result); - } else if (dataType instanceof IntegerType || dataType instanceof DateType) { - compareInt(left, right, result); - } else if (dataType instanceof LongType || dataType instanceof TimestampType - || dataType instanceof TimestampNTZType) { - compareLong(left, right, result); - } else if (dataType instanceof FloatType) { - compareFloat(left, right, result); - } else if (dataType instanceof DoubleType) { - compareDouble(left, right, result); - } else if (dataType instanceof DecimalType) { - compareDecimal(left, right, result); - } else if (dataType instanceof StringType) { - compareString(left, right, result); - } else if (dataType instanceof BinaryType) { - compareBinary(left, right, result); - } else { - throw new UnsupportedOperationException(dataType + " can not be compared."); - } - return result; - } - - static void compareBoolean(ColumnVector left, ColumnVector right, int[] result) { - for (int rowId = 0; rowId < left.getSize(); rowId++) { - if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { - result[rowId] = Boolean.compare(left.getBoolean(rowId), right.getBoolean(rowId)); - } - } - } - - static void compareByte(ColumnVector left, ColumnVector right, int[] result) { - for (int rowId = 0; rowId < left.getSize(); rowId++) { - if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { - result[rowId] = Byte.compare(left.getByte(rowId), right.getByte(rowId)); - } - } - } - - static void compareShort(ColumnVector left, ColumnVector right, int[] result) { - for (int rowId = 0; rowId < left.getSize(); rowId++) { - if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { - result[rowId] = Short.compare(left.getShort(rowId), right.getShort(rowId)); - } - } - } - - static void compareInt(ColumnVector left, ColumnVector right, int[] result) { - for (int rowId = 0; rowId < left.getSize(); rowId++) { - if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { - result[rowId] = Integer.compare(left.getInt(rowId), right.getInt(rowId)); - } - } - } - - static void compareLong(ColumnVector left, ColumnVector right, int[] result) { - for (int rowId = 0; rowId < left.getSize(); rowId++) { - if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { - result[rowId] = Long.compare(left.getLong(rowId), right.getLong(rowId)); - } - } - } - - static void compareFloat(ColumnVector left, ColumnVector right, int[] result) { - for (int rowId = 0; rowId < left.getSize(); rowId++) { - if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { - result[rowId] = Float.compare(left.getFloat(rowId), right.getFloat(rowId)); - } - } - } - - static void compareDouble(ColumnVector left, ColumnVector right, int[] result) { - for (int rowId = 0; rowId < left.getSize(); rowId++) { - if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { - result[rowId] = Double.compare(left.getDouble(rowId), right.getDouble(rowId)); - } - } - } - - static void compareString(ColumnVector left, ColumnVector right, int[] result) { - for (int rowId = 0; rowId < left.getSize(); rowId++) { - if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { - result[rowId] = STRING_COMPARATOR.compare(left.getString(rowId), right.getString(rowId)); - } - } - } - - static void compareDecimal(ColumnVector left, ColumnVector right, int[] result) { - Comparator<BigDecimal> comparator = Comparator.naturalOrder(); - for (int rowId = 0; rowId < left.getSize(); rowId++) { - if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { - result[rowId] = comparator.compare(left.getDecimal(rowId), right.getDecimal(rowId)); - } - } - } - - static void compareBinary(ColumnVector left, ColumnVector right, int[] result) { - for (int rowId = 0; rowId < left.getSize(); rowId++) { - if (!left.isNullAt(rowId) && !right.isNullAt(rowId)) { - result[rowId] = BINARY_COMPARTOR.compare(left.getBinary(rowId), right.getBinary(rowId)); - } - } - } - - static Expression childAt(Expression expression, int index) { - return expression.getChildren().get(index); - } - - /** - * Combines a list of column vectors into one column vector based on the resolution of - * idxToReturn - * @param vectors List of ColumnVectors of the same data type with length >= 1 - * @param idxToReturn Function that takes in a rowId and returns the index of the column vector - * to use as the return value - */ - static ColumnVector combinationVector(List<ColumnVector> vectors, Function<Integer, Integer> idxToReturn) { - return new ColumnVector() { - // Store the last lookup value to avoid multiple looks up for same rowId. - // The general pattern is call `isNullAt(rowId)` followed by `getBoolean(rowId)` or - // some other value accessor. So the cache of one value is enough. - private int lastLookupRowId = -1; - private ColumnVector lastLookupVector = null; - - @Override - public DataType getDataType() { - return vectors.get(0).getDataType(); - } - - @Override - public int getSize() { - return vectors.get(0).getSize(); - } - - @Override - public void close() { - Utils.closeCloseables(vectors.toArray(new ColumnVector[0])); - } - - @Override - public boolean isNullAt(int rowId) { - return getVector(rowId).isNullAt(rowId); - } - - @Override - public boolean getBoolean(int rowId) { - return getVector(rowId).getBoolean(rowId); - } - - @Override - public byte getByte(int rowId) { - return getVector(rowId).getByte(rowId); - } - - @Override - public short getShort(int rowId) { - return getVector(rowId).getShort(rowId); - } - - @Override - public int getInt(int rowId) { - return getVector(rowId).getInt(rowId); - } - - @Override - public long getLong(int rowId) { - return getVector(rowId).getLong(rowId); - } - - @Override - public float getFloat(int rowId) { - return getVector(rowId).getFloat(rowId); - } - - @Override - public double getDouble(int rowId) { - return getVector(rowId).getDouble(rowId); - } - - @Override - public byte[] getBinary(int rowId) { - return getVector(rowId).getBinary(rowId); - } - - @Override - public String getString(int rowId) { - return getVector(rowId).getString(rowId); - } - - @Override - public BigDecimal getDecimal(int rowId) { - return getVector(rowId).getDecimal(rowId); - } - - @Override - public MapValue getMap(int rowId) { - return getVector(rowId).getMap(rowId); - } - - @Override - public ArrayValue getArray(int rowId) { - return getVector(rowId).getArray(rowId); - } - - @Override - public ColumnVector getChild(int ordinal) { - return combinationVector(vectors.stream().map(v -> v.getChild(ordinal)).collect(Collectors.toList()), - idxToReturn); - } - - private ColumnVector getVector(int rowId) { - if (rowId == lastLookupRowId) { - return lastLookupVector; - } - lastLookupRowId = rowId; - lastLookupVector = vectors.get(idxToReturn.apply(rowId)); - return lastLookupVector; - } - }; - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java deleted file mode 100644 index 1b4e12e62b..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaEngine.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.aws.delta; - -import org.apache.hadoop.conf.Configuration; - -import io.delta.kernel.defaults.engine.DefaultEngine; -import io.delta.kernel.engine.ExpressionHandler; - -public class DeltaEngine extends DefaultEngine { - - protected DeltaEngine(Configuration configuration) { - super(configuration); - } - - @Override - public ExpressionHandler getExpressionHandler() { - return new DeltaExpressionHandler(); - } - - public static DeltaEngine create(Configuration configuration) { - return new DeltaEngine(configuration); - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java deleted file mode 100644 index b3082b5e8a..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionEvaluator.java +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.aws.delta; - -import static io.delta.kernel.internal.util.ExpressionUtils.getLeft; -import static io.delta.kernel.internal.util.ExpressionUtils.getRight; -import static io.delta.kernel.internal.util.ExpressionUtils.getUnaryChild; -import static io.delta.kernel.internal.util.Preconditions.checkArgument; -import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.booleanWrapperVector; -import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.childAt; -import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.compare; -import static org.apache.asterix.external.input.record.reader.aws.delta.DefaultExpressionUtils.evalNullability; - -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import io.delta.kernel.data.ColumnVector; -import io.delta.kernel.data.ColumnarBatch; -import io.delta.kernel.defaults.internal.data.vector.DefaultBooleanVector; -import io.delta.kernel.defaults.internal.data.vector.DefaultConstantVector; -import io.delta.kernel.defaults.internal.expressions.DefaultExpressionEvaluator; -import io.delta.kernel.expressions.AlwaysFalse; -import io.delta.kernel.expressions.AlwaysTrue; -import io.delta.kernel.expressions.And; -import io.delta.kernel.expressions.Column; -import io.delta.kernel.expressions.Expression; -import io.delta.kernel.expressions.Literal; -import io.delta.kernel.expressions.Or; -import io.delta.kernel.expressions.PartitionValueExpression; -import io.delta.kernel.expressions.Predicate; -import io.delta.kernel.expressions.ScalarExpression; -import io.delta.kernel.types.BinaryType; -import io.delta.kernel.types.BooleanType; -import io.delta.kernel.types.ByteType; -import io.delta.kernel.types.DataType; -import io.delta.kernel.types.DateType; -import io.delta.kernel.types.DecimalType; -import io.delta.kernel.types.DoubleType; -import io.delta.kernel.types.FloatType; -import io.delta.kernel.types.IntegerType; -import io.delta.kernel.types.LongType; -import io.delta.kernel.types.ShortType; -import io.delta.kernel.types.StringType; -import io.delta.kernel.types.StructType; -import io.delta.kernel.types.TimestampNTZType; -import io.delta.kernel.types.TimestampType; - -public class DeltaExpressionEvaluator extends DefaultExpressionEvaluator { - - private final Expression expression; - - public DeltaExpressionEvaluator(StructType structType, Expression expression, DataType dataType) { - super(structType, expression, dataType); - this.expression = expression; - } - - @Override - public ColumnVector eval(ColumnarBatch input) { - return new ExpressionEvalVisitor(input).visit(expression); - } - - /** - * Implementation of {@link ExpressionVisitor} to evaluate expression on a - * {@link ColumnarBatch}. - */ - private static class ExpressionEvalVisitor extends ExpressionVisitor<ColumnVector> { - private final ColumnarBatch input; - - ExpressionEvalVisitor(ColumnarBatch input) { - this.input = input; - } - - /* - | Operand 1 | Operand 2 | `AND` | `OR` | - |-----------|-----------|------------|------------| - | True | True | True | True | - | True | False | False | True | - | True | NULL | NULL | True | - | False | True | False | True | - | False | False | False | False | - | False | NULL | False | NULL | - | NULL | True | NULL | True | - | NULL | False | False | NULL | - | NULL | NULL | NULL | NULL | - */ - @Override - ColumnVector visitAnd(And and) { - PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(and); - ColumnVector left = argResults.leftResult; - ColumnVector right = argResults.rightResult; - int numRows = argResults.rowCount; - boolean[] result = new boolean[numRows]; - boolean[] nullability = new boolean[numRows]; - for (int rowId = 0; rowId < numRows; rowId++) { - boolean leftIsTrue = !left.isNullAt(rowId) && left.getBoolean(rowId); - boolean rightIsTrue = !right.isNullAt(rowId) && right.getBoolean(rowId); - boolean leftIsFalse = !left.isNullAt(rowId) && !left.getBoolean(rowId); - boolean rightIsFalse = !right.isNullAt(rowId) && !right.getBoolean(rowId); - - if (leftIsFalse || rightIsFalse) { - nullability[rowId] = false; - result[rowId] = false; - } else if (leftIsTrue && rightIsTrue) { - nullability[rowId] = false; - result[rowId] = true; - } else { - nullability[rowId] = true; - // result[rowId] is undefined when nullability[rowId] = true - } - } - return new DefaultBooleanVector(numRows, Optional.of(nullability), result); - } - - @Override - ColumnVector visitOr(Or or) { - PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(or); - ColumnVector left = argResults.leftResult; - ColumnVector right = argResults.rightResult; - int numRows = argResults.rowCount; - boolean[] result = new boolean[numRows]; - boolean[] nullability = new boolean[numRows]; - for (int rowId = 0; rowId < numRows; rowId++) { - boolean leftIsTrue = !left.isNullAt(rowId) && left.getBoolean(rowId); - boolean rightIsTrue = !right.isNullAt(rowId) && right.getBoolean(rowId); - boolean leftIsFalse = !left.isNullAt(rowId) && !left.getBoolean(rowId); - boolean rightIsFalse = !right.isNullAt(rowId) && !right.getBoolean(rowId); - - if (leftIsTrue || rightIsTrue) { - nullability[rowId] = false; - result[rowId] = true; - } else if (leftIsFalse && rightIsFalse) { - nullability[rowId] = false; - result[rowId] = false; - } else { - nullability[rowId] = true; - // result[rowId] is undefined when nullability[rowId] = true - } - } - return new DefaultBooleanVector(numRows, Optional.of(nullability), result); - } - - @Override - ColumnVector visitAlwaysTrue(AlwaysTrue alwaysTrue) { - return new DefaultConstantVector(BooleanType.BOOLEAN, input.getSize(), true); - } - - @Override - ColumnVector visitAlwaysFalse(AlwaysFalse alwaysFalse) { - return new DefaultConstantVector(BooleanType.BOOLEAN, input.getSize(), false); - } - - @Override - ColumnVector visitComparator(Predicate predicate) { - PredicateChildrenEvalResult argResults = evalBinaryExpressionChildren(predicate); - - int numRows = argResults.rowCount; - boolean[] result = new boolean[numRows]; - boolean[] nullability = evalNullability(argResults.leftResult, argResults.rightResult); - int[] compareResult = compare(argResults.leftResult, argResults.rightResult); - switch (predicate.getName()) { - case "=": - for (int rowId = 0; rowId < numRows; rowId++) { - result[rowId] = compareResult[rowId] == 0; - } - break; - case ">": - for (int rowId = 0; rowId < numRows; rowId++) { - result[rowId] = compareResult[rowId] > 0; - } - break; - case ">=": - for (int rowId = 0; rowId < numRows; rowId++) { - result[rowId] = compareResult[rowId] >= 0; - } - break; - case "<": - for (int rowId = 0; rowId < numRows; rowId++) { - result[rowId] = compareResult[rowId] < 0; - } - break; - case "<=": - for (int rowId = 0; rowId < numRows; rowId++) { - result[rowId] = compareResult[rowId] <= 0; - } - break; - default: - // We should never reach this based on the ExpressionVisitor - throw new IllegalStateException( - String.format("%s is not a recognized comparator", predicate.getName())); - } - - return new DefaultBooleanVector(numRows, Optional.of(nullability), result); - } - - @Override - ColumnVector visitLiteral(Literal literal) { - DataType dataType = literal.getDataType(); - if (dataType instanceof BooleanType || dataType instanceof ByteType || dataType instanceof ShortType - || dataType instanceof IntegerType || dataType instanceof LongType || dataType instanceof FloatType - || dataType instanceof DoubleType || dataType instanceof StringType - || dataType instanceof BinaryType || dataType instanceof DecimalType || dataType instanceof DateType - || dataType instanceof TimestampType || dataType instanceof TimestampNTZType) { - return new DefaultConstantVector(dataType, input.getSize(), literal.getValue()); - } - - throw new UnsupportedOperationException("unsupported expression encountered: " + literal); - } - - @Override - ColumnVector visitColumn(Column column) { - String[] names = column.getNames(); - DataType currentType = input.getSchema(); - ColumnVector columnVector = null; - for (int level = 0; level < names.length; level++) { - assertColumnExists(currentType instanceof StructType, input.getSchema(), column); - StructType structSchema = ((StructType) currentType); - int ordinal = structSchema.indexOf(names[level]); - assertColumnExists(ordinal != -1, input.getSchema(), column); - currentType = structSchema.at(ordinal).getDataType(); - - if (level == 0) { - columnVector = input.getColumnVector(ordinal); - } else { - columnVector = columnVector.getChild(ordinal); - } - } - assertColumnExists(columnVector != null, input.getSchema(), column); - return columnVector; - } - - @Override - ColumnVector visitCast(ImplicitCastExpression cast) { - ColumnVector inputResult = visit(cast.getInput()); - return cast.eval(inputResult); - } - - @Override - ColumnVector visitPartitionValue(PartitionValueExpression partitionValue) { - ColumnVector input = visit(partitionValue.getInput()); - return PartitionValueEvaluator.eval(input, partitionValue.getDataType()); - } - - @Override - ColumnVector visitElementAt(ScalarExpression elementAt) { - ColumnVector map = visit(childAt(elementAt, 0)); - ColumnVector lookupKey = visit(childAt(elementAt, 1)); - return ElementAtEvaluator.eval(map, lookupKey); - } - - @Override - ColumnVector visitNot(Predicate predicate) { - ColumnVector childResult = visit(childAt(predicate, 0)); - return booleanWrapperVector(childResult, rowId -> !childResult.getBoolean(rowId), - rowId -> childResult.isNullAt(rowId)); - } - - @Override - ColumnVector visitIsNotNull(Predicate predicate) { - ColumnVector childResult = visit(childAt(predicate, 0)); - return booleanWrapperVector(childResult, rowId -> !childResult.isNullAt(rowId), rowId -> false); - } - - @Override - ColumnVector visitIsNull(Predicate predicate) { - ColumnVector childResult = visit(getUnaryChild(predicate)); - return booleanWrapperVector(childResult, rowId -> childResult.isNullAt(rowId), rowId -> false); - } - - @Override - ColumnVector visitCoalesce(ScalarExpression coalesce) { - List<ColumnVector> childResults = - coalesce.getChildren().stream().map(this::visit).collect(Collectors.toList()); - return DefaultExpressionUtils.combinationVector(childResults, rowId -> { - for (int idx = 0; idx < childResults.size(); idx++) { - if (!childResults.get(idx).isNullAt(rowId)) { - return idx; - } - } - return 0; // If all are null then any idx suffices - }); - } - - /** - * Utility method to evaluate inputs to the binary input expression. Also validates the - * evaluated expression result {@link ColumnVector}s are of the same size. - * - * @param predicate - * @return Triplet of (result vector size, left operand result, left operand result) - */ - private PredicateChildrenEvalResult evalBinaryExpressionChildren(Predicate predicate) { - ColumnVector left = visit(getLeft(predicate)); - ColumnVector right = visit(getRight(predicate)); - checkArgument(left.getSize() == right.getSize(), - "Left and right operand returned different results: left=%d, right=d", left.getSize(), - right.getSize()); - return new PredicateChildrenEvalResult(left.getSize(), left, right); - } - } - - /** - * Encapsulates children expression result of binary input predicate - */ - private static class PredicateChildrenEvalResult { - public final int rowCount; - public final ColumnVector leftResult; - public final ColumnVector rightResult; - - PredicateChildrenEvalResult(int rowCount, ColumnVector leftResult, ColumnVector rightResult) { - this.rowCount = rowCount; - this.leftResult = leftResult; - this.rightResult = rightResult; - } - } - - private static void assertColumnExists(boolean condition, StructType schema, Column column) { - if (!condition) { - throw new IllegalArgumentException( - String.format("%s doesn't exist in input data schema: %s", column, schema)); - } - } - -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java deleted file mode 100644 index d9f5251420..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaExpressionHandler.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.aws.delta; - -import io.delta.kernel.defaults.engine.DefaultExpressionHandler; -import io.delta.kernel.expressions.Expression; -import io.delta.kernel.expressions.ExpressionEvaluator; -import io.delta.kernel.expressions.Predicate; -import io.delta.kernel.expressions.PredicateEvaluator; -import io.delta.kernel.types.DataType; -import io.delta.kernel.types.StructType; - -public class DeltaExpressionHandler extends DefaultExpressionHandler { - - @Override - public ExpressionEvaluator getEvaluator(StructType inputSchema, Expression expression, DataType outputType) { - return new DeltaExpressionEvaluator(inputSchema, expression, outputType); - } - - @Override - public PredicateEvaluator getPredicateEvaluator(StructType inputSchema, Predicate predicate) { - return new DeltaPredicateEvaluator(inputSchema, predicate); - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java index 44dfdf006c..121a76b2e8 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java @@ -22,14 +22,8 @@ import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.IOException; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Timestamp; -import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import org.apache.asterix.common.exceptions.ErrorCode; @@ -45,40 +39,15 @@ import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.hdfs.dataflow.ConfFactory; import io.delta.kernel.Scan; -import io.delta.kernel.data.ColumnVector; import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; -import io.delta.kernel.engine.ExpressionHandler; -import io.delta.kernel.expressions.ExpressionEvaluator; -import io.delta.kernel.expressions.Literal; import io.delta.kernel.expressions.Predicate; import io.delta.kernel.internal.InternalScanFileUtils; -import io.delta.kernel.internal.actions.DeletionVectorDescriptor; import io.delta.kernel.internal.data.ScanStateRow; -import io.delta.kernel.internal.data.SelectionColumnVector; -import io.delta.kernel.internal.deletionvectors.DeletionVectorUtils; -import io.delta.kernel.internal.deletionvectors.RoaringBitmapArray; -import io.delta.kernel.internal.util.ColumnMapping; -import io.delta.kernel.internal.util.InternalUtils; -import io.delta.kernel.internal.util.Tuple2; -import io.delta.kernel.types.BinaryType; -import io.delta.kernel.types.BooleanType; -import io.delta.kernel.types.ByteType; -import io.delta.kernel.types.DataType; -import io.delta.kernel.types.DateType; -import io.delta.kernel.types.DecimalType; -import io.delta.kernel.types.DoubleType; -import io.delta.kernel.types.FloatType; -import io.delta.kernel.types.IntegerType; -import io.delta.kernel.types.LongType; -import io.delta.kernel.types.ShortType; -import io.delta.kernel.types.StringType; -import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; -import io.delta.kernel.types.TimestampNTZType; -import io.delta.kernel.types.TimestampType; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; @@ -105,7 +74,7 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config, String filterExpressionStr) throws HyracksDataException { JobConf conf = config.getConf(); - this.engine = DeltaEngine.create(conf); + this.engine = DefaultEngine.create(conf); this.scanFiles = new ArrayList<>(); for (String scanFile : serScanFiles) { this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile)); @@ -125,7 +94,7 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { try { this.physicalDataIter = engine.getParquetHandler() .readParquetFiles(singletonCloseableIterator(fileStatus), physicalReadSchema, filterPredicate); - this.dataIter = transformPhysicalData(engine, scanState, scanFile, physicalDataIter); + this.dataIter = Scan.transformPhysicalData(engine, scanState, scanFile, physicalDataIter); if (dataIter.hasNext()) { rows = dataIter.next().getRows(); } @@ -196,182 +165,4 @@ public class DeltaFileRecordReader implements IRecordReader<Row> { public boolean handleException(Throwable th) { return false; } - - static CloseableIterator<FilteredColumnarBatch> transformPhysicalData(Engine engine, Row scanState, Row scanFile, - CloseableIterator<ColumnarBatch> physicalDataIter) throws IOException { - return new CloseableIterator<FilteredColumnarBatch>() { - boolean inited = false; - - // initialized as part of init() - StructType physicalReadSchema = null; - StructType logicalReadSchema = null; - String tablePath = null; - - RoaringBitmapArray currBitmap = null; - DeletionVectorDescriptor currDV = null; - - private void initIfRequired() { - if (inited) { - return; - } - physicalReadSchema = ScanStateRow.getPhysicalSchema(engine, scanState); - logicalReadSchema = ScanStateRow.getLogicalSchema(engine, scanState); - - tablePath = ScanStateRow.getTableRoot(scanState); - inited = true; - } - - @Override - public void close() throws IOException { - physicalDataIter.close(); - } - - @Override - public boolean hasNext() { - initIfRequired(); - return physicalDataIter.hasNext(); - } - - @Override - public FilteredColumnarBatch next() { - initIfRequired(); - ColumnarBatch nextDataBatch = physicalDataIter.next(); - - DeletionVectorDescriptor dv = InternalScanFileUtils.getDeletionVectorDescriptorFromRow(scanFile); - - int rowIndexOrdinal = nextDataBatch.getSchema().indexOf(StructField.METADATA_ROW_INDEX_COLUMN_NAME); - - // Get the selectionVector if DV is present - Optional<ColumnVector> selectionVector; - if (dv == null) { - selectionVector = Optional.empty(); - } else { - if (rowIndexOrdinal == -1) { - throw new IllegalArgumentException( - "Row index column is not " + "present in the data read from the Parquet file."); - } - if (!dv.equals(currDV)) { - Tuple2<DeletionVectorDescriptor, RoaringBitmapArray> dvInfo = - DeletionVectorUtils.loadNewDvAndBitmap(engine, tablePath, dv); - this.currDV = dvInfo._1; - this.currBitmap = dvInfo._2; - } - ColumnVector rowIndexVector = nextDataBatch.getColumnVector(rowIndexOrdinal); - selectionVector = Optional.of(new SelectionColumnVector(currBitmap, rowIndexVector)); - } - if (rowIndexOrdinal != -1) { - nextDataBatch = nextDataBatch.withDeletedColumnAt(rowIndexOrdinal); - } - - // Add partition columns - nextDataBatch = withPartitionColumns(engine.getExpressionHandler(), nextDataBatch, - InternalScanFileUtils.getPartitionValues(scanFile), physicalReadSchema); - - // Change back to logical schema - String columnMappingMode = ScanStateRow.getColumnMappingMode(scanState); - switch (columnMappingMode) { - case ColumnMapping.COLUMN_MAPPING_MODE_NAME: - case ColumnMapping.COLUMN_MAPPING_MODE_ID: - nextDataBatch = nextDataBatch.withNewSchema(logicalReadSchema); - break; - case ColumnMapping.COLUMN_MAPPING_MODE_NONE: - break; - default: - throw new UnsupportedOperationException( - "Column mapping mode is not yet supported: " + columnMappingMode); - } - - return new FilteredColumnarBatch(nextDataBatch, selectionVector); - } - }; - } - - public static ColumnarBatch withPartitionColumns(ExpressionHandler expressionHandler, ColumnarBatch dataBatch, - Map<String, String> partitionValues, StructType schemaWithPartitionCols) { - if (partitionValues == null || partitionValues.size() == 0) { - // no partition column vectors to attach to. - return dataBatch; - } - - for (int colIdx = 0; colIdx < schemaWithPartitionCols.length(); colIdx++) { - StructField structField = schemaWithPartitionCols.at(colIdx); - - if (partitionValues.containsKey(structField.getName())) { - // Create a partition vector - - ExpressionEvaluator evaluator = expressionHandler.getEvaluator(dataBatch.getSchema(), - literalForPartitionValue(structField.getDataType(), partitionValues.get(structField.getName())), - structField.getDataType()); - - ColumnVector partitionVector = evaluator.eval(dataBatch); - dataBatch = dataBatch.withNewColumn(colIdx, structField, partitionVector); - } - } - - return dataBatch; - } - - protected static Literal literalForPartitionValue(DataType dataType, String partitionValue) { - if (partitionValue == null) { - return Literal.ofNull(dataType); - } - - if (dataType instanceof BooleanType) { - return Literal.ofBoolean(Boolean.parseBoolean(partitionValue)); - } - if (dataType instanceof ByteType) { - return Literal.ofByte(Byte.parseByte(partitionValue)); - } - if (dataType instanceof ShortType) { - return Literal.ofShort(Short.parseShort(partitionValue)); - } - if (dataType instanceof IntegerType) { - return Literal.ofInt(Integer.parseInt(partitionValue)); - } - if (dataType instanceof LongType) { - return Literal.ofLong(Long.parseLong(partitionValue)); - } - if (dataType instanceof FloatType) { - return Literal.ofFloat(Float.parseFloat(partitionValue)); - } - if (dataType instanceof DoubleType) { - return Literal.ofDouble(Double.parseDouble(partitionValue)); - } - if (dataType instanceof StringType) { - return Literal.ofString(partitionValue); - } - if (dataType instanceof BinaryType) { - return Literal.ofBinary(partitionValue.getBytes()); - } - if (dataType instanceof DateType) { - return Literal.ofDate(InternalUtils.daysSinceEpoch(Date.valueOf(partitionValue))); - } - if (dataType instanceof DecimalType) { - DecimalType decimalType = (DecimalType) dataType; - return Literal.ofDecimal(new BigDecimal(partitionValue), decimalType.getPrecision(), - decimalType.getScale()); - } - if (dataType instanceof TimestampType) { - try { - Timestamp timestamp = Timestamp.valueOf(partitionValue); - return Literal.ofTimestamp(InternalUtils.microsSinceEpoch(timestamp)); - } catch (IllegalArgumentException e) { - Instant instant = Instant.parse(partitionValue); - return Literal.ofTimestamp(ChronoUnit.MICROS.between(Instant.EPOCH, instant)); - } - } - if (dataType instanceof TimestampNTZType) { - // Both the timestamp and timestamp_ntz have no timezone info, so they are interpreted - // in local time zone. - try { - Timestamp timestamp = Timestamp.valueOf(partitionValue); - return Literal.ofTimestampNtz(InternalUtils.microsSinceEpoch(timestamp)); - } catch (IllegalArgumentException e) { - Instant instant = Instant.parse(partitionValue); - return Literal.ofTimestampNtz(ChronoUnit.MICROS.between(Instant.EPOCH, instant)); - } - } - - throw new UnsupportedOperationException("Unsupported partition column: " + dataType); - } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java deleted file mode 100644 index 494d3bc5bf..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaPredicateEvaluator.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.aws.delta; - -import java.util.Optional; - -import io.delta.kernel.data.ColumnVector; -import io.delta.kernel.data.ColumnarBatch; -import io.delta.kernel.defaults.internal.data.vector.DefaultConstantVector; -import io.delta.kernel.expressions.And; -import io.delta.kernel.expressions.Column; -import io.delta.kernel.expressions.ExpressionEvaluator; -import io.delta.kernel.expressions.Literal; -import io.delta.kernel.expressions.Predicate; -import io.delta.kernel.expressions.PredicateEvaluator; -import io.delta.kernel.internal.util.Utils; -import io.delta.kernel.types.BooleanType; -import io.delta.kernel.types.StructField; -import io.delta.kernel.types.StructType; - -public class DeltaPredicateEvaluator implements PredicateEvaluator { - - private final ExpressionEvaluator expressionEvaluator; - private static final String EXISTING_SEL_VECTOR_COL_NAME = "____existing_selection_vector_value____"; - private static final StructField EXISTING_SEL_VECTOR_FIELD = - new StructField(EXISTING_SEL_VECTOR_COL_NAME, BooleanType.BOOLEAN, false); - - public DeltaPredicateEvaluator(StructType inputSchema, Predicate predicate) { - Predicate rewrittenPredicate = new And( - new Predicate("=", new Column(EXISTING_SEL_VECTOR_COL_NAME), Literal.ofBoolean(true)), predicate); - StructType rewrittenInputSchema = inputSchema.add(EXISTING_SEL_VECTOR_FIELD); - this.expressionEvaluator = - new DeltaExpressionEvaluator(rewrittenInputSchema, rewrittenPredicate, BooleanType.BOOLEAN); - } - - @Override - public ColumnVector eval(ColumnarBatch inputData, Optional<ColumnVector> existingSelectionVector) { - try { - ColumnVector newVector = existingSelectionVector - .orElse(new DefaultConstantVector(BooleanType.BOOLEAN, inputData.getSize(), true)); - ColumnarBatch withExistingSelVector = - inputData.withNewColumn(inputData.getSchema().length(), EXISTING_SEL_VECTOR_FIELD, newVector); - - return expressionEvaluator.eval(withExistingSelVector); - } finally { - // release the existing selection vector. - Utils.closeCloseables(existingSelectionVector.orElse(null)); - } - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java index 8328ec323c..03285efbe9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java @@ -60,6 +60,7 @@ import io.delta.kernel.Scan; import io.delta.kernel.Snapshot; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; import io.delta.kernel.exceptions.KernelEngineException; import io.delta.kernel.exceptions.KernelException; @@ -104,7 +105,7 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> configureJobConf(appCtx, conf, configuration); confFactory = new ConfFactory(conf); String tableMetadataPath = getTablePath(configuration); - Engine engine = DeltaEngine.create(conf); + Engine engine = DefaultEngine.create(conf); io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); Snapshot snapshot; try { @@ -122,7 +123,7 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> ARecordType expectedType = HDFSUtils.getExpectedType(conf); Map<String, FunctionCallInformation> functionCallInformationMap = HDFSUtils.getFunctionCallInformationMap(conf); - StructType fileSchema = snapshot.getSchema(engine); + StructType fileSchema = snapshot.getSchema(); requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap); } catch (IOException e) { throw new RuntimeException(e); @@ -132,26 +133,26 @@ public abstract class DeltaReaderFactory implements IRecordReaderFactory<Object> Expression filterExpression = ((DeltaTableFilterEvaluatorFactory) filterEvaluatorFactory).getFilterExpression(); Scan scan; if (filterExpression != null) { - scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema) - .withFilter(engine, (Predicate) filterExpression).build(); + scan = snapshot.getScanBuilder().withReadSchema(requiredSchema).withFilter((Predicate) filterExpression) + .build(); if (scan.getRemainingFilter().isPresent()) { filterExpressionStr = PredicateSerDe.serializeExpressionToJson(scan.getRemainingFilter().get()); } else { filterExpressionStr = null; } } else { - scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); + scan = snapshot.getScanBuilder().withReadSchema(requiredSchema).build(); filterExpressionStr = null; } scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); List<Row> scanFiles; try { scanFiles = getScanFiles(scan, engine); - } catch (UnsupportedOperationException | IllegalStateException e) { + } catch (UnsupportedOperationException | IllegalStateException | KernelEngineException e) { // Delta kernel API failed to apply expression due to type mismatch. // We need to fall back to skip applying the filter and return all files. LOGGER.info("Exception encountered while getting delta table files to scan {}", e.getMessage()); - scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); + scan = snapshot.getScanBuilder().withReadSchema(requiredSchema).build(); filterExpressionStr = null; scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); scanFiles = getScanFiles(scan, engine); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java deleted file mode 100644 index 388a287bd2..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ElementAtEvaluator.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.aws.delta; - -import static io.delta.kernel.defaults.internal.DefaultEngineErrors.unsupportedExpressionException; -import static io.delta.kernel.internal.util.Preconditions.checkArgument; -import static java.lang.String.format; -import static org.apache.asterix.external.input.record.reader.aws.delta.ImplicitCastExpression.canCastTo; - -import java.util.Arrays; - -import io.delta.kernel.data.ColumnVector; -import io.delta.kernel.data.MapValue; -import io.delta.kernel.expressions.Expression; -import io.delta.kernel.expressions.ScalarExpression; -import io.delta.kernel.internal.util.Utils; -import io.delta.kernel.types.DataType; -import io.delta.kernel.types.MapType; -import io.delta.kernel.types.StringType; - -/** - * Utility methods to evaluate {@code element_at} expression. - */ -class ElementAtEvaluator { - private ElementAtEvaluator() { - } - - /** - * Validate and transform the {@code element_at} expression with given validated and - * transformed inputs. - */ - static ScalarExpression validateAndTransform(ScalarExpression elementAt, Expression mapInput, DataType mapInputType, - Expression lookupKey, DataType lookupKeyType) { - - MapType asMapType = validateSupportedMapType(elementAt, mapInputType); - DataType keyTypeFromMapInput = asMapType.getKeyType(); - - if (!keyTypeFromMapInput.equivalent(lookupKeyType)) { - if (canCastTo(lookupKeyType, keyTypeFromMapInput)) { - lookupKey = new ImplicitCastExpression(lookupKey, keyTypeFromMapInput); - } else { - String reason = format("lookup key type (%s) is different from the map key type (%s)", lookupKeyType, - asMapType.getKeyType()); - throw unsupportedExpressionException(elementAt, reason); - } - } - return new ScalarExpression(elementAt.getName(), Arrays.asList(mapInput, lookupKey)); - } - - /** - * Utility method to evaluate the {@code element_at} on given map and key vectors. - * @param map {@link ColumnVector} of {@code map(string, string)} type. - * @param lookupKey {@link ColumnVector} of {@code string} type. - * @return result {@link ColumnVector} containing the lookup values. - */ - static ColumnVector eval(ColumnVector map, ColumnVector lookupKey) { - return new ColumnVector() { - // Store the last lookup value to avoid multiple looks up for same row id. - // The general pattern is call `isNullAt(rowId)` followed by `getString`. - // So the cache of one value is enough. - private int lastLookupRowId = -1; - private String lastLookupValue = null; - - @Override - public DataType getDataType() { - return ((MapType) map.getDataType()).getValueType(); - } - - @Override - public int getSize() { - return map.getSize(); - } - - @Override - public void close() { - Utils.closeCloseables(map, lookupKey); - } - - @Override - public boolean isNullAt(int rowId) { - if (rowId == lastLookupRowId) { - return lastLookupValue == null; - } - return map.isNullAt(rowId) || lookupValue(rowId) == null; - } - - @Override - public String getString(int rowId) { - lookupValue(rowId); - return lastLookupValue == null ? null : lastLookupValue; - } - - private Object lookupValue(int rowId) { - if (rowId == lastLookupRowId) { - return lastLookupValue; - } - lastLookupRowId = rowId; - String keyValue = lookupKey.getString(rowId); - lastLookupValue = findValueForKey(map.getMap(rowId), keyValue); - return lastLookupValue; - } - - /** - * Given a {@link MapValue} and string {@code key} find the corresponding value. - * Returns null if the key is not in the map. - * @param mapValue String->String map to search - * @param key the key to look up the value for; may be null - */ - private String findValueForKey(MapValue mapValue, String key) { - ColumnVector keyVector = mapValue.getKeys(); - for (int i = 0; i < mapValue.getSize(); i++) { - if ((keyVector.isNullAt(i) && key == null) - || (!keyVector.isNullAt(i) && keyVector.getString(i).equals(key))) { - return mapValue.getValues().isNullAt(i) ? null : mapValue.getValues().getString(i); - } - } - // If the key is not in the map return null - return null; - } - }; - } - - private static MapType validateSupportedMapType(Expression elementAt, DataType mapInputType) { - checkArgument(mapInputType instanceof MapType, "expected a map type input as first argument: " + elementAt); - MapType asMapType = (MapType) mapInputType; - // For now we only need to support lookup in columns of type `map(string -> string)`. - // Additional type support may be added later - if (asMapType.getKeyType().equivalent(StringType.STRING) - && asMapType.getValueType().equivalent(StringType.STRING)) { - return asMapType; - } - throw new UnsupportedOperationException( - format("%s: Supported only on type map(string, string) input data", elementAt)); - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java deleted file mode 100644 index 7e2a65be6d..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ExpressionVisitor.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.aws.delta; - -import static io.delta.kernel.expressions.AlwaysFalse.ALWAYS_FALSE; -import static io.delta.kernel.expressions.AlwaysTrue.ALWAYS_TRUE; -import static java.util.stream.Collectors.joining; - -import java.util.List; -import java.util.Locale; - -import io.delta.kernel.expressions.AlwaysFalse; -import io.delta.kernel.expressions.AlwaysTrue; -import io.delta.kernel.expressions.And; -import io.delta.kernel.expressions.Column; -import io.delta.kernel.expressions.Expression; -import io.delta.kernel.expressions.Literal; -import io.delta.kernel.expressions.Or; -import io.delta.kernel.expressions.PartitionValueExpression; -import io.delta.kernel.expressions.Predicate; -import io.delta.kernel.expressions.ScalarExpression; - -/** - * Interface to allow visiting an expression tree and implementing handling for each - * specific expression type. - * - * @param <R> Return type of result of visit expression methods. - */ -abstract class ExpressionVisitor<R> { - - abstract R visitAnd(And and); - - abstract R visitOr(Or or); - - abstract R visitAlwaysTrue(AlwaysTrue alwaysTrue); - - abstract R visitAlwaysFalse(AlwaysFalse alwaysFalse); - - abstract R visitComparator(Predicate predicate); - - abstract R visitLiteral(Literal literal); - - abstract R visitColumn(Column column); - - abstract R visitCast(ImplicitCastExpression cast); - - abstract R visitPartitionValue(PartitionValueExpression partitionValue); - - abstract R visitElementAt(ScalarExpression elementAt); - - abstract R visitNot(Predicate predicate); - - abstract R visitIsNotNull(Predicate predicate); - - abstract R visitIsNull(Predicate predicate); - - abstract R visitCoalesce(ScalarExpression ifNull); - - final R visit(Expression expression) { - if (expression instanceof PartitionValueExpression) { - return visitPartitionValue((PartitionValueExpression) expression); - } else if (expression instanceof ScalarExpression) { - return visitScalarExpression((ScalarExpression) expression); - } else if (expression instanceof Literal) { - return visitLiteral((Literal) expression); - } else if (expression instanceof Column) { - return visitColumn((Column) expression); - } else if (expression instanceof ImplicitCastExpression) { - return visitCast((ImplicitCastExpression) expression); - } - - throw new UnsupportedOperationException(String.format("Expression %s is not supported.", expression)); - } - - private R visitScalarExpression(ScalarExpression expression) { - List<Expression> children = expression.getChildren(); - String name = expression.getName().toUpperCase(Locale.ENGLISH); - switch (name) { - case "ALWAYS_TRUE": - return visitAlwaysTrue(ALWAYS_TRUE); - case "ALWAYS_FALSE": - return visitAlwaysFalse(ALWAYS_FALSE); - case "AND": - return visitAnd(new And(elemAsPredicate(children, 0), elemAsPredicate(children, 1))); - case "OR": - return visitOr(new Or(elemAsPredicate(children, 0), elemAsPredicate(children, 1))); - case "=": - case "<": - case "<=": - case ">": - case ">=": - return visitComparator(new Predicate(name, children)); - case "ELEMENT_AT": - return visitElementAt(expression); - case "NOT": - return visitNot(new Predicate(name, children)); - case "IS_NOT_NULL": - return visitIsNotNull(new Predicate(name, children)); - case "IS_NULL": - return visitIsNull(new Predicate(name, children)); - case "COALESCE": - return visitCoalesce(expression); - default: - throw new UnsupportedOperationException( - String.format("Scalar expression `%s` is not supported.", name)); - } - } - - private static Predicate elemAsPredicate(List<Expression> expressions, int index) { - if (expressions.size() <= index) { - throw new RuntimeException(String.format("Trying to access invalid entry (%d) in list %s", index, - expressions.stream().map(Object::toString).collect(joining(",")))); - } - Expression elemExpression = expressions.get(index); - if (!(elemExpression instanceof Predicate)) { - throw new RuntimeException("Expected a predicate, but got " + elemExpression); - } - return (Predicate) expressions.get(index); - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java deleted file mode 100644 index febd4d3160..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/ImplicitCastExpression.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.aws.delta; - -import static java.lang.String.format; -import static java.util.Collections.unmodifiableMap; -import static java.util.Objects.requireNonNull; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.delta.kernel.data.ColumnVector; -import io.delta.kernel.expressions.Expression; -import io.delta.kernel.types.DataType; - -/** - * An implicit cast expression to convert the input type to another given type. Here is the valid - * list of casts - * <p> - * <ul> - * <li>{@code byte} to {@code short, int, long, float, double}</li> - * <li>{@code short} to {@code int, long, float, double}</li> - * <li>{@code int} to {@code long, float, double}</li> - * <li>{@code long} to {@code float, double}</li> - * <li>{@code float} to {@code double}</li> - * </ul> - * - * <p> - * The above list is not exhaustive. Based on the need, we can add more casts. - * <p> - */ -final class ImplicitCastExpression implements Expression { - private final Expression input; - private final DataType outputType; - - /** - * Create a cast around the given input expression to specified output data - * type. It is the responsibility of the caller to validate the input expression can be cast - * to the new type using {@link #canCastTo(DataType, DataType)} - */ - ImplicitCastExpression(Expression input, DataType outputType) { - this.input = requireNonNull(input, "input is null"); - this.outputType = requireNonNull(outputType, "outputType is null"); - } - - public Expression getInput() { - return input; - } - - public DataType getOutputType() { - return outputType; - } - - @Override - public List<Expression> getChildren() { - return Collections.singletonList(input); - } - - /** - * Evaluate the given column expression on the input {@link ColumnVector}. - * - * @param input {@link ColumnVector} data of the input to the cast expression. - * @return {@link ColumnVector} result applying target type casting on every element in the - * input {@link ColumnVector}. - */ - ColumnVector eval(ColumnVector input) { - String fromTypeStr = input.getDataType().toString(); - switch (fromTypeStr) { - case "byte": - return new ByteUpConverter(outputType, input); - case "short": - return new ShortUpConverter(outputType, input); - case "integer": - return new IntUpConverter(outputType, input); - case "long": - return new LongUpConverter(outputType, input); - case "float": - return new FloatUpConverter(outputType, input); - default: - throw new UnsupportedOperationException(format("Cast from %s is not supported", fromTypeStr)); - } - } - - /** - * Map containing for each type what are the target cast types can be. - */ - private static final Map<String, List<String>> UP_CASTABLE_TYPE_TABLE; - - static { - Map<String, List<String>> map = new HashMap<>(); - map.put("byte", Arrays.asList("short", "integer", "long", "float", "double")); - map.put("short", Arrays.asList("integer", "long", "float", "double")); - map.put("integer", Arrays.asList("long", "float", "double")); - map.put("long", Arrays.asList("float", "double")); - map.put("float", Arrays.asList("double")); - UP_CASTABLE_TYPE_TABLE = unmodifiableMap(map); - } - - /** - * Utility method which returns whether the given {@code from} type can be cast to {@code to} - * type. - */ - static boolean canCastTo(DataType from, DataType to) { - // TODO: The type name should be a first class method on `DataType` instead of getting it - // using the `toString`. - String fromStr = from.toString(); - String toStr = to.toString(); - return UP_CASTABLE_TYPE_TABLE.containsKey(fromStr) && UP_CASTABLE_TYPE_TABLE.get(fromStr).contains(toStr); - } - - /** - * Base class for up casting {@link ColumnVector} data. - */ - private abstract static class UpConverter implements ColumnVector { - protected final DataType targetType; - protected final ColumnVector inputVector; - - UpConverter(DataType targetType, ColumnVector inputVector) { - this.targetType = targetType; - this.inputVector = inputVector; - } - - @Override - public DataType getDataType() { - return targetType; - } - - @Override - public boolean isNullAt(int rowId) { - return inputVector.isNullAt(rowId); - } - - @Override - public int getSize() { - return inputVector.getSize(); - } - - @Override - public void close() { - inputVector.close(); - } - } - - private static class ByteUpConverter extends UpConverter { - ByteUpConverter(DataType targetType, ColumnVector inputVector) { - super(targetType, inputVector); - } - - @Override - public short getShort(int rowId) { - return inputVector.getByte(rowId); - } - - @Override - public int getInt(int rowId) { - return inputVector.getByte(rowId); - } - - @Override - public long getLong(int rowId) { - return inputVector.getByte(rowId); - } - - @Override - public float getFloat(int rowId) { - return inputVector.getByte(rowId); - } - - @Override - public double getDouble(int rowId) { - return inputVector.getByte(rowId); - } - } - - private static class ShortUpConverter extends UpConverter { - ShortUpConverter(DataType targetType, ColumnVector inputVector) { - super(targetType, inputVector); - } - - @Override - public int getInt(int rowId) { - return inputVector.getShort(rowId); - } - - @Override - public long getLong(int rowId) { - return inputVector.getShort(rowId); - } - - @Override - public float getFloat(int rowId) { - return inputVector.getShort(rowId); - } - - @Override - public double getDouble(int rowId) { - return inputVector.getShort(rowId); - } - } - - private static class IntUpConverter extends UpConverter { - IntUpConverter(DataType targetType, ColumnVector inputVector) { - super(targetType, inputVector); - } - - @Override - public long getLong(int rowId) { - return inputVector.getInt(rowId); - } - - @Override - public float getFloat(int rowId) { - return inputVector.getInt(rowId); - } - - @Override - public double getDouble(int rowId) { - return inputVector.getInt(rowId); - } - } - - private static class LongUpConverter extends UpConverter { - LongUpConverter(DataType targetType, ColumnVector inputVector) { - super(targetType, inputVector); - } - - @Override - public float getFloat(int rowId) { - return inputVector.getLong(rowId); - } - - @Override - public double getDouble(int rowId) { - return inputVector.getLong(rowId); - } - } - - private static class FloatUpConverter extends UpConverter { - FloatUpConverter(DataType targetType, ColumnVector inputVector) { - super(targetType, inputVector); - } - - @Override - public double getDouble(int rowId) { - return inputVector.getFloat(rowId); - } - } -} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java deleted file mode 100644 index c24b225e7a..0000000000 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/PartitionValueEvaluator.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.external.input.record.reader.aws.delta; - -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Timestamp; -import java.time.Instant; -import java.time.temporal.ChronoUnit; - -import io.delta.kernel.data.ColumnVector; -import io.delta.kernel.internal.util.InternalUtils; -import io.delta.kernel.types.DataType; -import io.delta.kernel.types.DateType; -import io.delta.kernel.types.IntegerType; -import io.delta.kernel.types.LongType; -import io.delta.kernel.types.TimestampNTZType; -import io.delta.kernel.types.TimestampType; - -/** - * Utility methods to evaluate {@code partition_value} expression - */ -class PartitionValueEvaluator { - /** - * Evaluate the {@code partition_value} expression for given input column vector and generate - * a column vector with decoded values according to the given partition type. - */ - static ColumnVector eval(ColumnVector input, DataType partitionType) { - return new ColumnVector() { - @Override - public DataType getDataType() { - return partitionType; - } - - @Override - public int getSize() { - return input.getSize(); - } - - @Override - public void close() { - input.close(); - } - - @Override - public boolean isNullAt(int rowId) { - return input.isNullAt(rowId); - } - - @Override - public boolean getBoolean(int rowId) { - return Boolean.parseBoolean(input.getString(rowId)); - } - - @Override - public byte getByte(int rowId) { - return Byte.parseByte(input.getString(rowId)); - } - - @Override - public short getShort(int rowId) { - return Short.parseShort(input.getString(rowId)); - } - - @Override - public int getInt(int rowId) { - if (partitionType.equivalent(IntegerType.INTEGER)) { - return Integer.parseInt(input.getString(rowId)); - } else if (partitionType.equivalent(DateType.DATE)) { - return InternalUtils.daysSinceEpoch(Date.valueOf(input.getString(rowId))); - } - throw new UnsupportedOperationException("Invalid value request for data type"); - } - - @Override - public long getLong(int rowId) { - if (partitionType.equivalent(LongType.LONG)) { - return Long.parseLong(input.getString(rowId)); - } else if (partitionType.equivalent(TimestampType.TIMESTAMP) - || partitionType.equivalent(TimestampNTZType.TIMESTAMP_NTZ)) { - // Both the timestamp and timestamp_ntz have no timezone info, - // so they are interpreted in local time zone. - try { - Timestamp timestamp = Timestamp.valueOf(input.getString(rowId)); - return InternalUtils.microsSinceEpoch(timestamp); - } catch (IllegalArgumentException e) { - Instant instant = Instant.parse(input.getString(rowId)); - return ChronoUnit.MICROS.between(Instant.EPOCH, instant); - } - } - throw new UnsupportedOperationException("Invalid value request for data type"); - } - - @Override - public float getFloat(int rowId) { - return Float.parseFloat(input.getString(rowId)); - } - - @Override - public double getDouble(int rowId) { - return Double.parseDouble(input.getString(rowId)); - } - - @Override - public byte[] getBinary(int rowId) { - return input.isNullAt(rowId) ? null : input.getString(rowId).getBytes(); - } - - @Override - public String getString(int rowId) { - return input.getString(rowId); - } - - @Override - public BigDecimal getDecimal(int rowId) { - return input.isNullAt(rowId) ? null : new BigDecimal(input.getString(rowId)); - } - }; - } -} diff --git a/hyracks-fullstack/pom.xml b/hyracks-fullstack/pom.xml index fc3950bd93..6d8a19bc27 100644 --- a/hyracks-fullstack/pom.xml +++ b/hyracks-fullstack/pom.xml @@ -474,6 +474,11 @@ <artifactId>jackson-datatype-jsr310</artifactId> <version>${jackson.version}</version> </dependency> + <dependency> + <groupId>com.fasterxml.jackson.datatype</groupId> + <artifactId>jackson-datatype-jdk8</artifactId> + <version>${jackson.version}</version> + </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> @@ -677,6 +682,11 @@ <artifactId>hadoop-client-runtime</artifactId> <version>3.4.1</version> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-api</artifactId> + <version>3.4.1</version> + </dependency> </dependencies> </dependencyManagement> <build>
