This is an automated email from the ASF dual-hosted git repository. etudenhoefner pushed a commit to branch spark-uuid-read-write-support-3.4 in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 7fcc85b044d70e382443275ad0ee090c79d9193b Author: Eduard Tudenhoefner <[email protected]> AuthorDate: Wed Apr 26 09:12:09 2023 +0200 address review feedback --- api/src/main/java/org/apache/iceberg/util/UUIDUtil.java | 12 +++++++++++- .../apache/iceberg/spark/data/SparkOrcValueReaders.java | 16 ++-------------- .../org/apache/iceberg/spark/data/SparkOrcWriter.java | 3 ++- .../apache/iceberg/spark/data/SparkParquetWriters.java | 15 ++++++--------- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java index 4cedb5bd22..b72feec00b 100644 --- a/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java +++ b/api/src/main/java/org/apache/iceberg/util/UUIDUtil.java @@ -62,7 +62,17 @@ public class UUIDUtil { } public static ByteBuffer convertToByteBuffer(UUID value) { - ByteBuffer buffer = ByteBuffer.allocate(16); + return convertToByteBuffer(value, null); + } + + public static ByteBuffer convertToByteBuffer(UUID value, ByteBuffer reuse) { + ByteBuffer buffer; + if (reuse != null) { + buffer = reuse; + } else { + buffer = ByteBuffer.allocate(16); + } + buffer.order(ByteOrder.BIG_ENDIAN); buffer.putLong(0, value.getMostSignificantBits()); buffer.putLong(8, value.getLeastSignificantBits()); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java index 2bc5ef96a3..670537fbf8 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueReaders.java @@ -20,7 +20,6 @@ package org.apache.iceberg.spark.data; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.util.List; import java.util.Map; import org.apache.iceberg.orc.OrcValueReader; @@ -178,14 +177,6 @@ public class SparkOrcValueReaders { } private static class UUIDReader implements OrcValueReader<UTF8String> { - private static final ThreadLocal<ByteBuffer> BUFFER = - ThreadLocal.withInitial( - () -> { - ByteBuffer buffer = ByteBuffer.allocate(16); - buffer.order(ByteOrder.BIG_ENDIAN); - return buffer; - }); - private static final UUIDReader INSTANCE = new UUIDReader(); private UUIDReader() {} @@ -193,11 +184,8 @@ public class SparkOrcValueReaders { @Override public UTF8String nonNullRead(ColumnVector vector, int row) { BytesColumnVector bytesVector = (BytesColumnVector) vector; - ByteBuffer buffer = BUFFER.get(); - buffer.rewind(); - buffer.put(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]); - buffer.rewind(); - + ByteBuffer buffer = + ByteBuffer.wrap(bytesVector.vector[row], bytesVector.start[row], bytesVector.length[row]); return UTF8String.fromString(UUIDUtil.convert(buffer).toString()); } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index c5477fac08..6b799e677b 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -178,7 +178,8 @@ public class SparkOrcWriter implements OrcRowWriter<InternalRow> { case BINARY: if (ORCSchemaUtil.BinaryType.UUID .toString() - .equals(fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) { + .equalsIgnoreCase( + fieldType.getAttributeValue(ORCSchemaUtil.ICEBERG_BINARY_TYPE_ATTRIBUTE))) { fieldGetter = SpecializedGetters::getUTF8String; } else { fieldGetter = SpecializedGetters::getBinary; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java index c1abec96cd..af6f65a089 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetWriters.java @@ -35,6 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.GroupType; @@ -213,6 +214,10 @@ public class SparkParquetWriters { return new UTF8StringWriter(desc); } + private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) { + return new UUIDWriter(desc); + } + private static PrimitiveWriter<Decimal> decimalAsInteger( ColumnDescriptor desc, int precision, int scale) { return new IntegerDecimalWriter(desc, precision, scale); @@ -323,10 +328,6 @@ public class SparkParquetWriters { } } - private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) { - return new UUIDWriter(desc); - } - private static class UUIDWriter extends PrimitiveWriter<UTF8String> { private static final ThreadLocal<ByteBuffer> BUFFER = ThreadLocal.withInitial( @@ -343,11 +344,7 @@ public class SparkParquetWriters { @Override public void write(int repetitionLevel, UTF8String string) { UUID uuid = UUID.fromString(string.toString()); - ByteBuffer buffer = BUFFER.get(); - buffer.rewind(); - buffer.putLong(uuid.getMostSignificantBits()); - buffer.putLong(uuid.getLeastSignificantBits()); - buffer.rewind(); + ByteBuffer buffer = UUIDUtil.convertToByteBuffer(uuid, BUFFER.get()); column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer)); } }
