This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 09140e5283 Spark: Support Parquet dictionary encoded UUIDs (#13324)
09140e5283 is described below

commit 09140e52836048b112c42c9cfe721295bd57048b
Author: Fokko Driesprong <[email protected]>
AuthorDate: Thu Jul 10 00:29:16 2025 +0200

    Spark: Support Parquet dictionary encoded UUIDs (#13324)
    
    * Spark: Support Parquet dictionary encoded UUIDs
    
    While fixing some issues on the PyIceberg ends to fully support UUIDs:
    https://github.com/apache/iceberg-python/pull/2007
    
    I noticed this issue, and was suprised since UUID used to work with
    Spark, but it turns out that the dictionary encoded UUID was not
    implemented yet.
    
    For PyIceberg we only generate little data, so therefore this wasn't
    caught previously.
    
    * Add another test
---
 .../GenericArrowVectorAccessorFactory.java         | 22 ++++++++++++++++++++--
 .../vectorized/ArrowVectorAccessorFactory.java     |  8 ++++++++
 .../parquet/TestParquetVectorizedReads.java        | 15 +++++++++++++++
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
index ecbc1cf1d9..2e24ce2e87 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
@@ -156,7 +156,8 @@ public class GenericArrowVectorAccessorFactory<
       switch (primitive.getPrimitiveTypeName()) {
         case FIXED_LEN_BYTE_ARRAY:
         case BINARY:
-          return new DictionaryBinaryAccessor<>((IntVector) vector, 
dictionary);
+          return new DictionaryBinaryAccessor<>(
+              (IntVector) vector, dictionary, stringFactorySupplier.get());
         case FLOAT:
           return new DictionaryFloatAccessor<>((IntVector) vector, dictionary);
         case INT64:
@@ -452,17 +453,27 @@ public class GenericArrowVectorAccessorFactory<
       extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT> 
{
     private final IntVector offsetVector;
     private final Dictionary dictionary;
+    private final StringFactory<Utf8StringT> stringFactory;
 
-    DictionaryBinaryAccessor(IntVector vector, Dictionary dictionary) {
+    DictionaryBinaryAccessor(
+        IntVector vector, Dictionary dictionary, StringFactory<Utf8StringT> 
stringFactory) {
       super(vector);
       this.offsetVector = vector;
       this.dictionary = dictionary;
+      this.stringFactory = stringFactory;
     }
 
     @Override
     public final byte[] getBinary(int rowId) {
       return dictionary.decodeToBinary(offsetVector.get(rowId)).getBytes();
     }
+
+    @Override
+    public Utf8StringT getUTF8String(int rowId) {
+      return null == stringFactory
+          ? super.getUTF8String(rowId)
+          : stringFactory.ofRow(offsetVector, dictionary, rowId);
+    }
   }
 
   private static class DictionaryTimestampInt96Accessor<
@@ -815,6 +826,13 @@ public class GenericArrowVectorAccessorFactory<
               getGenericClass().getSimpleName()));
     }
 
+    /** Create a UTF8 String from the row value in the Dictionary. */
+    default Utf8StringT ofRow(IntVector offsetVector, Dictionary dictionary, 
int rowId) {
+      throw new UnsupportedOperationException(
+          String.format(
+              "Creating %s from a Dictionary is not supported", 
getGenericClass().getSimpleName()));
+    }
+
     /** Create a UTF8 String from the byte array. */
     Utf8StringT ofBytes(byte[] bytes);
 
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java
index 29e938bb09..b4bb9a9187 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ArrowVectorAccessorFactory.java
@@ -22,11 +22,13 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory;
 import org.apache.iceberg.util.UUIDUtil;
+import org.apache.parquet.column.Dictionary;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.sql.vectorized.ArrowColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarArray;
@@ -81,6 +83,12 @@ final class ArrowVectorAccessorFactory
       return 
UTF8String.fromString(UUIDUtil.convert(vector.get(rowId)).toString());
     }
 
+    @Override
+    public UTF8String ofRow(IntVector offsetVector, Dictionary dictionary, int 
rowId) {
+      byte[] bytes = 
dictionary.decodeToBinary(offsetVector.get(rowId)).getBytes();
+      return UTF8String.fromString(UUIDUtil.convert(bytes).toString());
+    }
+
     @Override
     public UTF8String ofBytes(byte[] bytes) {
       return UTF8String.fromBytes(bytes);
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
index 67712546f6..990beec2fd 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/vectorized/parquet/TestParquetVectorizedReads.java
@@ -400,4 +400,19 @@ public class TestParquetVectorizedReads extends 
AvroDataTest {
         .hasMessageStartingWith("Cannot support vectorized reads for column")
         .hasMessageEndingWith("Disable vectorized reads to read this 
table/file");
   }
+
+  @Test
+  public void testUuidReads() throws Exception {
+    // Just one row to maintain dictionary encoding
+    int numRows = 1;
+    Schema schema = new Schema(optional(100, "uuid", Types.UUIDType.get()));
+
+    File dataFile = File.createTempFile("junit", null, temp.toFile());
+    assertThat(dataFile.delete()).as("Delete should succeed").isTrue();
+    Iterable<Record> data = generateData(schema, numRows, 0L, 0, IDENTITY);
+    try (FileAppender<Record> writer = getParquetV2Writer(schema, dataFile)) {
+      writer.addAll(data);
+    }
+    assertRecordsMatch(schema, numRows, data, dataFile, false, BATCH_SIZE);
+  }
 }

Reply via email to