This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new bd046f844a Spark: Fix issue when partitioning by UUID (#8250)
bd046f844a is described below

commit bd046f844a1cbad6c98919d8ea63176aeae78d33
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu May 16 18:48:58 2024 +0200

    Spark: Fix issue when partitioning by UUID (#8250)
---
 .../java/org/apache/iceberg/RecordWrapperTest.java |  4 ++--
 .../iceberg/spark/source/WritersBenchmark.java     |  9 +++++---
 .../apache/iceberg/spark/source/BaseReader.java    |  4 +++-
 .../iceberg/spark/source/InternalRowWrapper.java   | 26 +++++++++++++++++----
 .../spark/source/SparkPartitionedFanoutWriter.java |  2 +-
 .../spark/source/SparkPartitionedWriter.java       |  2 +-
 .../spark/source/SparkPositionDeltaWrite.java      |  5 ++--
 .../apache/iceberg/spark/source/SparkWrite.java    |  2 +-
 .../spark/source/TestInternalRowWrapper.java       |  3 ++-
 .../spark/source/TestSparkAppenderFactory.java     |  2 +-
 .../spark/source/TestSparkFileWriterFactory.java   |  2 +-
 .../spark/source/TestSparkPartitioningWriters.java |  2 +-
 .../source/TestSparkPositionDeltaWriters.java      |  2 +-
 .../spark/source/TestSparkReaderDeletes.java       |  3 ++-
 .../apache/iceberg/spark/sql/TestCreateTable.java  | 27 ++++++++++++++++++++++
 .../iceberg/spark/source/WritersBenchmark.java     |  9 +++++---
 .../apache/iceberg/spark/source/BaseReader.java    |  4 +++-
 .../iceberg/spark/source/InternalRowWrapper.java   | 26 +++++++++++++++++----
 .../spark/source/SparkPartitionedFanoutWriter.java |  2 +-
 .../spark/source/SparkPartitionedWriter.java       |  2 +-
 .../spark/source/SparkPositionDeltaWrite.java      |  5 ++--
 .../apache/iceberg/spark/source/SparkWrite.java    |  2 +-
 .../spark/source/TestInternalRowWrapper.java       |  3 ++-
 .../spark/source/TestSparkAppenderFactory.java     |  2 +-
 .../spark/source/TestSparkFileWriterFactory.java   |  2 +-
 .../spark/source/TestSparkPartitioningWriters.java |  2 +-
 .../source/TestSparkPositionDeltaWriters.java      |  2 +-
 .../spark/source/TestSparkReaderDeletes.java       |  3 ++-
 .../apache/iceberg/spark/sql/TestCreateTable.java  | 26 +++++++++++++++++++++
 .../iceberg/spark/source/WritersBenchmark.java     |  9 +++++---
 .../apache/iceberg/spark/source/BaseReader.java    |  4 +++-
 .../iceberg/spark/source/InternalRowWrapper.java   | 26 +++++++++++++++++----
 .../spark/source/SparkPartitionedFanoutWriter.java |  2 +-
 .../spark/source/SparkPartitionedWriter.java       |  2 +-
 .../spark/source/SparkPositionDeltaWrite.java      |  5 ++--
 .../apache/iceberg/spark/source/SparkWrite.java    |  2 +-
 .../spark/source/TestInternalRowWrapper.java       |  3 ++-
 .../spark/source/TestSparkAppenderFactory.java     |  2 +-
 .../spark/source/TestSparkFileWriterFactory.java   |  2 +-
 .../spark/source/TestSparkPartitioningWriters.java |  2 +-
 .../source/TestSparkPositionDeltaWriters.java      |  2 +-
 .../spark/source/TestSparkReaderDeletes.java       |  3 ++-
 .../apache/iceberg/spark/sql/TestCreateTable.java  | 26 +++++++++++++++++++++
 43 files changed, 216 insertions(+), 59 deletions(-)

diff --git a/data/src/test/java/org/apache/iceberg/RecordWrapperTest.java 
b/data/src/test/java/org/apache/iceberg/RecordWrapperTest.java
index 22b928d238..94e9825fa5 100644
--- a/data/src/test/java/org/apache/iceberg/RecordWrapperTest.java
+++ b/data/src/test/java/org/apache/iceberg/RecordWrapperTest.java
@@ -44,8 +44,8 @@ public abstract class RecordWrapperTest {
           optional(113, "bytes", Types.BinaryType.get()),
           required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
           required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
-          required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // maximum 
precision
-          );
+          required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum 
precision
+          optional(117, "uuid", Types.UUIDType.get()));
 
   private static final Types.StructType TIMESTAMP_WITHOUT_ZONE =
       Types.StructType.of(
diff --git 
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
 
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
index 13ff034e4b..dd2cf80716 100644
--- 
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
+++ 
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -193,7 +193,8 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     PartitionKey partitionKey = new PartitionKey(partitionedSpec, 
table().schema());
     StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
-    InternalRowWrapper internalRowWrapper = new 
InternalRowWrapper(dataSparkType);
+    InternalRowWrapper internalRowWrapper =
+        new InternalRowWrapper(dataSparkType, table().schema().asStruct());
 
     try (ClusteredDataWriter<InternalRow> closeableWriter = writer) {
       for (InternalRow row : rows) {
@@ -256,7 +257,8 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     PartitionKey partitionKey = new PartitionKey(partitionedSpec, 
table().schema());
     StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
-    InternalRowWrapper internalRowWrapper = new 
InternalRowWrapper(dataSparkType);
+    InternalRowWrapper internalRowWrapper =
+        new InternalRowWrapper(dataSparkType, table().schema().asStruct());
 
     try (FanoutDataWriter<InternalRow> closeableWriter = writer) {
       for (InternalRow row : rows) {
@@ -324,7 +326,8 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     PartitionKey partitionKey = new PartitionKey(partitionedSpec, 
table().schema());
     StructType deleteSparkType = SparkSchemaUtil.convert(table().schema());
-    InternalRowWrapper internalRowWrapper = new 
InternalRowWrapper(deleteSparkType);
+    InternalRowWrapper internalRowWrapper =
+        new InternalRowWrapper(deleteSparkType, table().schema().asStruct());
 
     try (ClusteredEqualityDeleteWriter<InternalRow> closeableWriter = writer) {
       for (InternalRow row : rows) {
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 4fb838202c..0882edcb7c 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -259,7 +259,9 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
 
     SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter 
counter) {
       super(filePath, deletes, tableSchema, expectedSchema, counter);
-      this.asStructLike = new 
InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+      this.asStructLike =
+          new InternalRowWrapper(
+              SparkSchemaUtil.convert(requiredSchema()), 
requiredSchema().asStruct());
     }
 
     @Override
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
index 524266f6f8..d1682b8c85 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
@@ -19,9 +19,13 @@
 package org.apache.iceberg.spark.source;
 
 import java.nio.ByteBuffer;
+import java.util.UUID;
 import java.util.function.BiFunction;
 import java.util.stream.Stream;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.BinaryType;
 import org.apache.spark.sql.types.DataType;
@@ -40,9 +44,17 @@ class InternalRowWrapper implements StructLike {
   private InternalRow row = null;
 
   @SuppressWarnings("unchecked")
-  InternalRowWrapper(StructType rowType) {
+  InternalRowWrapper(StructType rowType, Types.StructType icebergSchema) {
     this.types = 
Stream.of(rowType.fields()).map(StructField::dataType).toArray(DataType[]::new);
-    this.getters = 
Stream.of(types).map(InternalRowWrapper::getter).toArray(BiFunction[]::new);
+    Preconditions.checkArgument(
+        types.length == icebergSchema.fields().size(),
+        "Invalid length: Spark struct type (%s) != Iceberg struct type (%s)",
+        types.length,
+        icebergSchema.fields().size());
+    this.getters = new BiFunction[types.length];
+    for (int i = 0; i < types.length; i++) {
+      getters[i] = getter(icebergSchema.fields().get(i).type(), types[i]);
+    }
   }
 
   InternalRowWrapper wrap(InternalRow internalRow) {
@@ -71,8 +83,13 @@ class InternalRowWrapper implements StructLike {
     row.update(pos, value);
   }
 
-  private static BiFunction<InternalRow, Integer, ?> getter(DataType type) {
+  private static BiFunction<InternalRow, Integer, ?> getter(Type icebergType, 
DataType type) {
     if (type instanceof StringType) {
+      // Spark represents UUIDs as strings
+      if (Type.TypeID.UUID == icebergType.typeId()) {
+        return (row, pos) -> 
UUID.fromString(row.getUTF8String(pos).toString());
+      }
+
       return (row, pos) -> row.getUTF8String(pos).toString();
     } else if (type instanceof DecimalType) {
       DecimalType decimal = (DecimalType) type;
@@ -82,7 +99,8 @@ class InternalRowWrapper implements StructLike {
       return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));
     } else if (type instanceof StructType) {
       StructType structType = (StructType) type;
-      InternalRowWrapper nestedWrapper = new InternalRowWrapper(structType);
+      InternalRowWrapper nestedWrapper =
+          new InternalRowWrapper(structType, icebergType.asStructType());
       return (row, pos) -> nestedWrapper.wrap(row.getStruct(pos, 
structType.size()));
     }
 
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
index f17cd260f9..c34ad2f3ad 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedFanoutWriter extends 
PartitionedFanoutWriter<Intern
       StructType sparkSchema) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.partitionKey = new PartitionKey(spec, schema);
-    this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+    this.internalRowWrapper = new InternalRowWrapper(sparkSchema, 
schema.asStruct());
   }
 
   @Override
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
index a860916443..6904446829 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedWriter extends 
PartitionedWriter<InternalRow> {
       StructType sparkSchema) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.partitionKey = new PartitionKey(spec, schema);
-    this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+    this.internalRowWrapper = new InternalRowWrapper(sparkSchema, 
schema.asStruct());
   }
 
   @Override
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index ce4b248e0f..2d38292439 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -375,7 +375,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
 
     protected InternalRowWrapper initPartitionRowWrapper(Types.StructType 
partitionType) {
       StructType sparkPartitionType = (StructType) 
SparkSchemaUtil.convert(partitionType);
-      return new InternalRowWrapper(sparkPartitionType);
+      return new InternalRowWrapper(sparkPartitionType, partitionType);
     }
 
     protected Map<Integer, StructProjection> buildPartitionProjections(
@@ -645,7 +645,8 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
 
       this.dataSpec = table.spec();
       this.dataPartitionKey = new PartitionKey(dataSpec, context.dataSchema());
-      this.internalRowDataWrapper = new 
InternalRowWrapper(context.dataSparkType());
+      this.internalRowDataWrapper =
+          new InternalRowWrapper(context.dataSparkType(), 
context.dataSchema().asStruct());
     }
 
     @Override
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 59fecd45cc..c26b7f5f38 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -741,7 +741,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
       this.io = io;
       this.spec = spec;
       this.partitionKey = new PartitionKey(spec, dataSchema);
-      this.internalRowWrapper = new InternalRowWrapper(dataSparkType);
+      this.internalRowWrapper = new InternalRowWrapper(dataSparkType, 
dataSchema.asStruct());
     }
 
     @Override
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
index 9e75145faf..1b4698fe5b 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
@@ -53,7 +53,8 @@ public class TestInternalRowWrapper extends RecordWrapperTest 
{
     Iterable<InternalRow> rowList = RandomData.generateSpark(schema, 
numRecords, 101L);
 
     InternalRecordWrapper recordWrapper = new 
InternalRecordWrapper(schema.asStruct());
-    InternalRowWrapper rowWrapper = new 
InternalRowWrapper(SparkSchemaUtil.convert(schema));
+    InternalRowWrapper rowWrapper =
+        new InternalRowWrapper(SparkSchemaUtil.convert(schema), 
schema.asStruct());
 
     Iterator<Record> actual = recordList.iterator();
     Iterator<InternalRow> expected = rowList.iterator();
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
index 1f4c613f74..0664400c79 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
@@ -56,7 +56,7 @@ public class TestSparkAppenderFactory extends 
TestAppenderFactory<InternalRow> {
   protected StructLikeSet expectedRowSet(Iterable<InternalRow> rows) {
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
index 8f4b3042b1..575e6658db 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
@@ -61,7 +61,7 @@ public class TestSparkFileWriterFactory extends 
TestFileWriterFactory<InternalRo
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     StructType sparkType = SparkSchemaUtil.convert(table.schema());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
index 5e8107806a..979abd21e7 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPartitioningWriters extends 
TestPartitioningWriters<Intern
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     StructType sparkType = SparkSchemaUtil.convert(table.schema());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
index baac1efe0e..9dc56abf9f 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPositionDeltaWriters extends 
TestPositionDeltaWriters<Inte
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     StructType sparkType = SparkSchemaUtil.convert(table.schema());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 88fd3d96dd..76a4143fcb 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -315,7 +315,8 @@ public class TestSparkReaderDeletes extends DeleteReadTests 
{
           new EqualityDeleteRowReader(task, table, null, table.schema(), 
false)) {
         while (reader.next()) {
           actualRowSet.add(
-              new InternalRowWrapper(SparkSchemaUtil.convert(table.schema()))
+              new InternalRowWrapper(
+                      SparkSchemaUtil.convert(table.schema()), 
table.schema().asStruct())
                   .wrap(reader.get().copy()));
         }
       }
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index a6256afcdf..3a62361590 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.sql;
 
 import java.io.File;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.PartitionSpec;
@@ -33,6 +34,7 @@ import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.types.Types.StructType;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
@@ -104,6 +106,31 @@ public class TestCreateTable extends SparkCatalogTestBase {
         table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
   }
 
+  @Test
+  public void testCreateTablePartitionedByUUID() {
+    Assertions.assertThat(validationCatalog.tableExists(tableIdent)).isFalse();
+    Schema schema = new Schema(1, Types.NestedField.optional(1, "uuid", 
Types.UUIDType.get()));
+    PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uuid", 
16).build();
+    validationCatalog.createTable(tableIdent, schema, spec);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assertions.assertThat(table).isNotNull();
+
+    StructType expectedSchema =
+        StructType.of(Types.NestedField.optional(1, "uuid", 
Types.UUIDType.get()));
+    Assertions.assertThat(table.schema().asStruct()).isEqualTo(expectedSchema);
+    Assertions.assertThat(table.spec().fields()).hasSize(1);
+
+    String uuid = UUID.randomUUID().toString();
+
+    sql("INSERT INTO %s VALUES('%s')", tableName, uuid);
+
+    Assertions.assertThat(sql("SELECT uuid FROM %s", tableName))
+        .hasSize(1)
+        .element(0)
+        .isEqualTo(row(uuid));
+  }
+
   @Test
   public void testCreateTableInRootNamespace() {
     Assume.assumeTrue(
diff --git 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
index 71813c5a63..963159fe4e 100644
--- 
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
+++ 
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -220,7 +220,8 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     PartitionKey partitionKey = new PartitionKey(partitionedSpec, 
table().schema());
     StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
-    InternalRowWrapper internalRowWrapper = new 
InternalRowWrapper(dataSparkType);
+    InternalRowWrapper internalRowWrapper =
+        new InternalRowWrapper(dataSparkType, table().schema().asStruct());
 
     try (ClusteredDataWriter<InternalRow> closeableWriter = writer) {
       for (InternalRow row : rows) {
@@ -283,7 +284,8 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     PartitionKey partitionKey = new PartitionKey(partitionedSpec, 
table().schema());
     StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
-    InternalRowWrapper internalRowWrapper = new 
InternalRowWrapper(dataSparkType);
+    InternalRowWrapper internalRowWrapper =
+        new InternalRowWrapper(dataSparkType, table().schema().asStruct());
 
     try (FanoutDataWriter<InternalRow> closeableWriter = writer) {
       for (InternalRow row : rows) {
@@ -351,7 +353,8 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     PartitionKey partitionKey = new PartitionKey(partitionedSpec, 
table().schema());
     StructType deleteSparkType = SparkSchemaUtil.convert(table().schema());
-    InternalRowWrapper internalRowWrapper = new 
InternalRowWrapper(deleteSparkType);
+    InternalRowWrapper internalRowWrapper =
+        new InternalRowWrapper(deleteSparkType, table().schema().asStruct());
 
     try (ClusteredEqualityDeleteWriter<InternalRow> closeableWriter = writer) {
       for (InternalRow row : rows) {
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index c2b3e7c2dc..238069e1c9 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -264,7 +264,9 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
 
     SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter 
counter) {
       super(filePath, deletes, tableSchema, expectedSchema, counter);
-      this.asStructLike = new 
InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+      this.asStructLike =
+          new InternalRowWrapper(
+              SparkSchemaUtil.convert(requiredSchema()), 
requiredSchema().asStruct());
     }
 
     @Override
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
index 524266f6f8..d1682b8c85 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
@@ -19,9 +19,13 @@
 package org.apache.iceberg.spark.source;
 
 import java.nio.ByteBuffer;
+import java.util.UUID;
 import java.util.function.BiFunction;
 import java.util.stream.Stream;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.BinaryType;
 import org.apache.spark.sql.types.DataType;
@@ -40,9 +44,17 @@ class InternalRowWrapper implements StructLike {
   private InternalRow row = null;
 
   @SuppressWarnings("unchecked")
-  InternalRowWrapper(StructType rowType) {
+  InternalRowWrapper(StructType rowType, Types.StructType icebergSchema) {
     this.types = 
Stream.of(rowType.fields()).map(StructField::dataType).toArray(DataType[]::new);
-    this.getters = 
Stream.of(types).map(InternalRowWrapper::getter).toArray(BiFunction[]::new);
+    Preconditions.checkArgument(
+        types.length == icebergSchema.fields().size(),
+        "Invalid length: Spark struct type (%s) != Iceberg struct type (%s)",
+        types.length,
+        icebergSchema.fields().size());
+    this.getters = new BiFunction[types.length];
+    for (int i = 0; i < types.length; i++) {
+      getters[i] = getter(icebergSchema.fields().get(i).type(), types[i]);
+    }
   }
 
   InternalRowWrapper wrap(InternalRow internalRow) {
@@ -71,8 +83,13 @@ class InternalRowWrapper implements StructLike {
     row.update(pos, value);
   }
 
-  private static BiFunction<InternalRow, Integer, ?> getter(DataType type) {
+  private static BiFunction<InternalRow, Integer, ?> getter(Type icebergType, 
DataType type) {
     if (type instanceof StringType) {
+      // Spark represents UUIDs as strings
+      if (Type.TypeID.UUID == icebergType.typeId()) {
+        return (row, pos) -> 
UUID.fromString(row.getUTF8String(pos).toString());
+      }
+
       return (row, pos) -> row.getUTF8String(pos).toString();
     } else if (type instanceof DecimalType) {
       DecimalType decimal = (DecimalType) type;
@@ -82,7 +99,8 @@ class InternalRowWrapper implements StructLike {
       return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));
     } else if (type instanceof StructType) {
       StructType structType = (StructType) type;
-      InternalRowWrapper nestedWrapper = new InternalRowWrapper(structType);
+      InternalRowWrapper nestedWrapper =
+          new InternalRowWrapper(structType, icebergType.asStructType());
       return (row, pos) -> nestedWrapper.wrap(row.getStruct(pos, 
structType.size()));
     }
 
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
index f17cd260f9..c34ad2f3ad 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedFanoutWriter extends 
PartitionedFanoutWriter<Intern
       StructType sparkSchema) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.partitionKey = new PartitionKey(spec, schema);
-    this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+    this.internalRowWrapper = new InternalRowWrapper(sparkSchema, 
schema.asStruct());
   }
 
   @Override
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
index a860916443..6904446829 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedWriter extends 
PartitionedWriter<InternalRow> {
       StructType sparkSchema) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.partitionKey = new PartitionKey(spec, schema);
-    this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+    this.internalRowWrapper = new InternalRowWrapper(sparkSchema, 
schema.asStruct());
   }
 
   @Override
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 5c6243bbb0..c5fc8e0b0f 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -391,7 +391,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
 
     protected InternalRowWrapper initPartitionRowWrapper(Types.StructType 
partitionType) {
       StructType sparkPartitionType = (StructType) 
SparkSchemaUtil.convert(partitionType);
-      return new InternalRowWrapper(sparkPartitionType);
+      return new InternalRowWrapper(sparkPartitionType, partitionType);
     }
 
     protected Map<Integer, StructProjection> buildPartitionProjections(
@@ -653,7 +653,8 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
 
       this.dataSpec = table.spec();
       this.dataPartitionKey = new PartitionKey(dataSpec, context.dataSchema());
-      this.internalRowDataWrapper = new 
InternalRowWrapper(context.dataSparkType());
+      this.internalRowDataWrapper =
+          new InternalRowWrapper(context.dataSparkType(), 
context.dataSchema().asStruct());
     }
 
     @Override
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index fb49d0bff5..d879a1f961 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -756,7 +756,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
       this.io = io;
       this.spec = spec;
       this.partitionKey = new PartitionKey(spec, dataSchema);
-      this.internalRowWrapper = new InternalRowWrapper(dataSparkType);
+      this.internalRowWrapper = new InternalRowWrapper(dataSparkType, 
dataSchema.asStruct());
     }
 
     @Override
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
index 9e75145faf..1b4698fe5b 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
@@ -53,7 +53,8 @@ public class TestInternalRowWrapper extends RecordWrapperTest 
{
     Iterable<InternalRow> rowList = RandomData.generateSpark(schema, 
numRecords, 101L);
 
     InternalRecordWrapper recordWrapper = new 
InternalRecordWrapper(schema.asStruct());
-    InternalRowWrapper rowWrapper = new 
InternalRowWrapper(SparkSchemaUtil.convert(schema));
+    InternalRowWrapper rowWrapper =
+        new InternalRowWrapper(SparkSchemaUtil.convert(schema), 
schema.asStruct());
 
     Iterator<Record> actual = recordList.iterator();
     Iterator<InternalRow> expected = rowList.iterator();
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
index 1f4c613f74..0664400c79 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
@@ -56,7 +56,7 @@ public class TestSparkAppenderFactory extends 
TestAppenderFactory<InternalRow> {
   protected StructLikeSet expectedRowSet(Iterable<InternalRow> rows) {
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
index 8f4b3042b1..575e6658db 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
@@ -61,7 +61,7 @@ public class TestSparkFileWriterFactory extends 
TestFileWriterFactory<InternalRo
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     StructType sparkType = SparkSchemaUtil.convert(table.schema());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
index 5e8107806a..979abd21e7 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPartitioningWriters extends 
TestPartitioningWriters<Intern
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     StructType sparkType = SparkSchemaUtil.convert(table.schema());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
index baac1efe0e..9dc56abf9f 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPositionDeltaWriters extends 
TestPositionDeltaWriters<Inte
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     StructType sparkType = SparkSchemaUtil.convert(table.schema());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index b3edb0e708..4643836542 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -324,7 +324,8 @@ public class TestSparkReaderDeletes extends DeleteReadTests 
{
           new EqualityDeleteRowReader(task, table, null, table.schema(), 
false)) {
         while (reader.next()) {
           actualRowSet.add(
-              new InternalRowWrapper(SparkSchemaUtil.convert(table.schema()))
+              new InternalRowWrapper(
+                      SparkSchemaUtil.convert(table.schema()), 
table.schema().asStruct())
                   .wrap(reader.get().copy()));
         }
       }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index ecfd6759b9..b4dde2f945 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.sql;
 
 import java.io.File;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -104,6 +105,31 @@ public class TestCreateTable extends SparkCatalogTestBase {
         table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
   }
 
+  @Test
+  public void testCreateTablePartitionedByUUID() {
+    Assertions.assertThat(validationCatalog.tableExists(tableIdent)).isFalse();
+    Schema schema = new Schema(1, Types.NestedField.optional(1, "uuid", 
Types.UUIDType.get()));
+    PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uuid", 
16).build();
+    validationCatalog.createTable(tableIdent, schema, spec);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assertions.assertThat(table).isNotNull();
+
+    StructType expectedSchema =
+        StructType.of(Types.NestedField.optional(1, "uuid", 
Types.UUIDType.get()));
+    Assertions.assertThat(table.schema().asStruct()).isEqualTo(expectedSchema);
+    Assertions.assertThat(table.spec().fields()).hasSize(1);
+
+    String uuid = UUID.randomUUID().toString();
+
+    sql("INSERT INTO %s VALUES('%s')", tableName, uuid);
+
+    Assertions.assertThat(sql("SELECT uuid FROM %s", tableName))
+        .hasSize(1)
+        .element(0)
+        .isEqualTo(row(uuid));
+  }
+
   @Test
   public void testCreateTableInRootNamespace() {
     Assume.assumeTrue(
diff --git 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
index 71813c5a63..963159fe4e 100644
--- 
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
+++ 
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -220,7 +220,8 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     PartitionKey partitionKey = new PartitionKey(partitionedSpec, 
table().schema());
     StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
-    InternalRowWrapper internalRowWrapper = new 
InternalRowWrapper(dataSparkType);
+    InternalRowWrapper internalRowWrapper =
+        new InternalRowWrapper(dataSparkType, table().schema().asStruct());
 
     try (ClusteredDataWriter<InternalRow> closeableWriter = writer) {
       for (InternalRow row : rows) {
@@ -283,7 +284,8 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     PartitionKey partitionKey = new PartitionKey(partitionedSpec, 
table().schema());
     StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
-    InternalRowWrapper internalRowWrapper = new 
InternalRowWrapper(dataSparkType);
+    InternalRowWrapper internalRowWrapper =
+        new InternalRowWrapper(dataSparkType, table().schema().asStruct());
 
     try (FanoutDataWriter<InternalRow> closeableWriter = writer) {
       for (InternalRow row : rows) {
@@ -351,7 +353,8 @@ public abstract class WritersBenchmark extends 
IcebergSourceBenchmark {
 
     PartitionKey partitionKey = new PartitionKey(partitionedSpec, 
table().schema());
     StructType deleteSparkType = SparkSchemaUtil.convert(table().schema());
-    InternalRowWrapper internalRowWrapper = new 
InternalRowWrapper(deleteSparkType);
+    InternalRowWrapper internalRowWrapper =
+        new InternalRowWrapper(deleteSparkType, table().schema().asStruct());
 
     try (ClusteredEqualityDeleteWriter<InternalRow> closeableWriter = writer) {
       for (InternalRow row : rows) {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index a982309ec1..3c9438480d 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -251,7 +251,9 @@ abstract class BaseReader<T, TaskT extends ScanTask> 
implements Closeable {
 
     SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter 
counter) {
       super(filePath, deletes, tableSchema, expectedSchema, counter);
-      this.asStructLike = new 
InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+      this.asStructLike =
+          new InternalRowWrapper(
+              SparkSchemaUtil.convert(requiredSchema()), 
requiredSchema().asStruct());
     }
 
     @Override
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
index 524266f6f8..d1682b8c85 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
@@ -19,9 +19,13 @@
 package org.apache.iceberg.spark.source;
 
 import java.nio.ByteBuffer;
+import java.util.UUID;
 import java.util.function.BiFunction;
 import java.util.stream.Stream;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.BinaryType;
 import org.apache.spark.sql.types.DataType;
@@ -40,9 +44,17 @@ class InternalRowWrapper implements StructLike {
   private InternalRow row = null;
 
   @SuppressWarnings("unchecked")
-  InternalRowWrapper(StructType rowType) {
+  InternalRowWrapper(StructType rowType, Types.StructType icebergSchema) {
     this.types = 
Stream.of(rowType.fields()).map(StructField::dataType).toArray(DataType[]::new);
-    this.getters = 
Stream.of(types).map(InternalRowWrapper::getter).toArray(BiFunction[]::new);
+    Preconditions.checkArgument(
+        types.length == icebergSchema.fields().size(),
+        "Invalid length: Spark struct type (%s) != Iceberg struct type (%s)",
+        types.length,
+        icebergSchema.fields().size());
+    this.getters = new BiFunction[types.length];
+    for (int i = 0; i < types.length; i++) {
+      getters[i] = getter(icebergSchema.fields().get(i).type(), types[i]);
+    }
   }
 
   InternalRowWrapper wrap(InternalRow internalRow) {
@@ -71,8 +83,13 @@ class InternalRowWrapper implements StructLike {
     row.update(pos, value);
   }
 
-  private static BiFunction<InternalRow, Integer, ?> getter(DataType type) {
+  private static BiFunction<InternalRow, Integer, ?> getter(Type icebergType, 
DataType type) {
     if (type instanceof StringType) {
+      // Spark represents UUIDs as strings
+      if (Type.TypeID.UUID == icebergType.typeId()) {
+        return (row, pos) -> 
UUID.fromString(row.getUTF8String(pos).toString());
+      }
+
       return (row, pos) -> row.getUTF8String(pos).toString();
     } else if (type instanceof DecimalType) {
       DecimalType decimal = (DecimalType) type;
@@ -82,7 +99,8 @@ class InternalRowWrapper implements StructLike {
       return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));
     } else if (type instanceof StructType) {
       StructType structType = (StructType) type;
-      InternalRowWrapper nestedWrapper = new InternalRowWrapper(structType);
+      InternalRowWrapper nestedWrapper =
+          new InternalRowWrapper(structType, icebergType.asStructType());
       return (row, pos) -> nestedWrapper.wrap(row.getStruct(pos, 
structType.size()));
     }
 
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
index f17cd260f9..c34ad2f3ad 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedFanoutWriter extends 
PartitionedFanoutWriter<Intern
       StructType sparkSchema) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.partitionKey = new PartitionKey(spec, schema);
-    this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+    this.internalRowWrapper = new InternalRowWrapper(sparkSchema, 
schema.asStruct());
   }
 
   @Override
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
index a860916443..6904446829 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedWriter extends 
PartitionedWriter<InternalRow> {
       StructType sparkSchema) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.partitionKey = new PartitionKey(spec, schema);
-    this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+    this.internalRowWrapper = new InternalRowWrapper(sparkSchema, 
schema.asStruct());
   }
 
   @Override
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 022283631f..a964f76863 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -402,7 +402,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
 
     protected InternalRowWrapper initPartitionRowWrapper(Types.StructType 
partitionType) {
       StructType sparkPartitionType = (StructType) 
SparkSchemaUtil.convert(partitionType);
-      return new InternalRowWrapper(sparkPartitionType);
+      return new InternalRowWrapper(sparkPartitionType, partitionType);
     }
 
     protected Map<Integer, StructProjection> buildPartitionProjections(
@@ -663,7 +663,8 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
 
       this.dataSpec = table.spec();
       this.dataPartitionKey = new PartitionKey(dataSpec, context.dataSchema());
-      this.internalRowDataWrapper = new 
InternalRowWrapper(context.dataSparkType());
+      this.internalRowDataWrapper =
+          new InternalRowWrapper(context.dataSparkType(), 
context.dataSchema().asStruct());
     }
 
     @Override
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 32f560a7bd..d23c473bb4 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -772,7 +772,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
       this.io = io;
       this.spec = spec;
       this.partitionKey = new PartitionKey(spec, dataSchema);
-      this.internalRowWrapper = new InternalRowWrapper(dataSparkType);
+      this.internalRowWrapper = new InternalRowWrapper(dataSparkType, 
dataSchema.asStruct());
     }
 
     @Override
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
index a0bc982002..0c869aa8e7 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
@@ -54,7 +54,8 @@ public class TestInternalRowWrapper extends RecordWrapperTest 
{
     Iterable<InternalRow> rowList = RandomData.generateSpark(schema, 
numRecords, 101L);
 
     InternalRecordWrapper recordWrapper = new 
InternalRecordWrapper(schema.asStruct());
-    InternalRowWrapper rowWrapper = new 
InternalRowWrapper(SparkSchemaUtil.convert(schema));
+    InternalRowWrapper rowWrapper =
+        new InternalRowWrapper(SparkSchemaUtil.convert(schema), 
schema.asStruct());
 
     Iterator<Record> actual = recordList.iterator();
     Iterator<InternalRow> expected = rowList.iterator();
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
index 1f4c613f74..0664400c79 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
@@ -56,7 +56,7 @@ public class TestSparkAppenderFactory extends 
TestAppenderFactory<InternalRow> {
   protected StructLikeSet expectedRowSet(Iterable<InternalRow> rows) {
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
index 8f4b3042b1..575e6658db 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
@@ -61,7 +61,7 @@ public class TestSparkFileWriterFactory extends 
TestFileWriterFactory<InternalRo
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     StructType sparkType = SparkSchemaUtil.convert(table.schema());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
index 5e8107806a..979abd21e7 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPartitioningWriters extends 
TestPartitioningWriters<Intern
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     StructType sparkType = SparkSchemaUtil.convert(table.schema());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
index baac1efe0e..9dc56abf9f 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPositionDeltaWriters extends 
TestPositionDeltaWriters<Inte
     StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
     StructType sparkType = SparkSchemaUtil.convert(table.schema());
     for (InternalRow row : rows) {
-      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+      InternalRowWrapper wrapper = new InternalRowWrapper(sparkType, 
table.schema().asStruct());
       set.add(wrapper.wrap(row));
     }
     return set;
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index b3edb0e708..4643836542 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -324,7 +324,8 @@ public class TestSparkReaderDeletes extends DeleteReadTests 
{
           new EqualityDeleteRowReader(task, table, null, table.schema(), 
false)) {
         while (reader.next()) {
           actualRowSet.add(
-              new InternalRowWrapper(SparkSchemaUtil.convert(table.schema()))
+              new InternalRowWrapper(
+                      SparkSchemaUtil.convert(table.schema()), 
table.schema().asStruct())
                   .wrap(reader.get().copy()));
         }
       }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index 718233c697..73be40e050 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.nio.file.Files;
+import java.util.UUID;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -111,6 +112,31 @@ public class TestCreateTable extends CatalogTestBase {
         .isNull();
   }
 
+  @TestTemplate
+  public void testCreateTablePartitionedByUUID() {
+    Assertions.assertThat(validationCatalog.tableExists(tableIdent)).isFalse();
+    Schema schema = new Schema(1, Types.NestedField.optional(1, "uuid", 
Types.UUIDType.get()));
+    PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uuid", 
16).build();
+    validationCatalog.createTable(tableIdent, schema, spec);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assertions.assertThat(table).isNotNull();
+
+    StructType expectedSchema =
+        StructType.of(Types.NestedField.optional(1, "uuid", 
Types.UUIDType.get()));
+    Assertions.assertThat(table.schema().asStruct()).isEqualTo(expectedSchema);
+    Assertions.assertThat(table.spec().fields()).hasSize(1);
+
+    String uuid = UUID.randomUUID().toString();
+
+    sql("INSERT INTO %s VALUES('%s')", tableName, uuid);
+
+    Assertions.assertThat(sql("SELECT uuid FROM %s", tableName))
+        .hasSize(1)
+        .element(0)
+        .isEqualTo(row(uuid));
+  }
+
   @TestTemplate
   public void testCreateTableInRootNamespace() {
     assumeThat(catalogName)

Reply via email to