This is an automated email from the ASF dual-hosted git repository. etudenhoefner pushed a commit to branch spark-uuid-read-write-support-3.4 in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit ced5a0737fa608645f5ae386b387944912cb697d Author: Eduard Tudenhoefner <[email protected]> AuthorDate: Mon Apr 24 17:17:08 2023 +0200 Add vectorization support for UTF8String --- .../java/org/apache/iceberg/util/RandomUtil.java | 4 ++++ .../GenericArrowVectorAccessorFactory.java | 27 +++++++++++++++++++++- .../vectorized/ArrowVectorAccessorFactory.java | 7 ++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java index a84dc4d8f8..9131e61661 100644 --- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java @@ -182,6 +182,10 @@ public class RandomUtil { BigInteger unscaled = new BigInteger(String.valueOf(value + 1)); BigDecimal bd = new BigDecimal(unscaled, type.scale()); return negate(value) ? bd.negate() : bd; + case UUID: + byte[] uuidBytes = new byte[16]; + random.nextBytes(uuidBytes); + return uuidBytes; default: throw new IllegalArgumentException( "Cannot generate random value for unknown type: " + primitive); diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java index e9305e399c..a988516bc6 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java @@ -218,7 +218,8 @@ public class GenericArrowVectorAccessorFactory< return new FixedSizeBinaryBackedDecimalAccessor<>( (FixedSizeBinaryVector) vector, decimalFactorySupplier.get()); } - return new FixedSizeBinaryAccessor<>((FixedSizeBinaryVector) vector); + return new FixedSizeBinaryAccessor<>( + (FixedSizeBinaryVector) vector, stringFactorySupplier.get()); } throw new UnsupportedOperationException("Unsupported vector: " + vector.getClass()); } @@ -558,16 +559,32 @@ public class GenericArrowVectorAccessorFactory< extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> { private final FixedSizeBinaryVector vector; + private final StringFactory<Utf8StringT> stringFactory; FixedSizeBinaryAccessor(FixedSizeBinaryVector vector) { super(vector); this.vector = vector; + this.stringFactory = null; + } + + FixedSizeBinaryAccessor( + FixedSizeBinaryVector vector, StringFactory<Utf8StringT> stringFactory) { + super(vector); + this.vector = vector; + this.stringFactory = stringFactory; } @Override public byte[] getBinary(int rowId) { return vector.get(rowId); } + + @Override + public Utf8StringT getUTF8String(int rowId) { + return null == stringFactory + ? super.getUTF8String(rowId) + : stringFactory.ofRow(vector, rowId); + } } private static class ArrayAccessor< @@ -794,6 +811,14 @@ public class GenericArrowVectorAccessorFactory< /** Create a UTF8 String from the row value in the arrow vector. */ Utf8StringT ofRow(VarCharVector vector, int rowId); + /** Create a UTF8 String from the row value in the FixedSizeBinaryVector vector. */ + default Utf8StringT ofRow(FixedSizeBinaryVector vector, int rowId) { + throw new UnsupportedOperationException( + String.format( + "Creating %s from a FixedSizeBinaryVector is not supported", + getGenericClass().getSimpleName())); + } + /** Create a UTF8 String from the byte array. */ Utf8StringT ofBytes(byte[] bytes); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java index e32ebcb02b..29e938bb09 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java @@ -21,10 +21,12 @@ package org.apache.iceberg.spark.data.vectorized; import java.math.BigDecimal; import java.nio.ByteBuffer; import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.vector.FixedSizeBinaryVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory; +import org.apache.iceberg.util.UUIDUtil; import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.vectorized.ArrowColumnVector; import org.apache.spark.sql.vectorized.ColumnarArray; @@ -74,6 +76,11 @@ final class ArrowVectorAccessorFactory null, vector.getDataBuffer().memoryAddress() + start, end - start); } + @Override + public UTF8String ofRow(FixedSizeBinaryVector vector, int rowId) { + return UTF8String.fromString(UUIDUtil.convert(vector.get(rowId)).toString()); + } + @Override public UTF8String ofBytes(byte[] bytes) { return UTF8String.fromBytes(bytes);
