This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new bd046f844a Spark: Fix issue when partitioning by UUID (#8250)
bd046f844a is described below
commit bd046f844a1cbad6c98919d8ea63176aeae78d33
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Thu May 16 18:48:58 2024 +0200
Spark: Fix issue when partitioning by UUID (#8250)
---
.../java/org/apache/iceberg/RecordWrapperTest.java | 4 ++--
.../iceberg/spark/source/WritersBenchmark.java | 9 +++++---
.../apache/iceberg/spark/source/BaseReader.java | 4 +++-
.../iceberg/spark/source/InternalRowWrapper.java | 26 +++++++++++++++++----
.../spark/source/SparkPartitionedFanoutWriter.java | 2 +-
.../spark/source/SparkPartitionedWriter.java | 2 +-
.../spark/source/SparkPositionDeltaWrite.java | 5 ++--
.../apache/iceberg/spark/source/SparkWrite.java | 2 +-
.../spark/source/TestInternalRowWrapper.java | 3 ++-
.../spark/source/TestSparkAppenderFactory.java | 2 +-
.../spark/source/TestSparkFileWriterFactory.java | 2 +-
.../spark/source/TestSparkPartitioningWriters.java | 2 +-
.../source/TestSparkPositionDeltaWriters.java | 2 +-
.../spark/source/TestSparkReaderDeletes.java | 3 ++-
.../apache/iceberg/spark/sql/TestCreateTable.java | 27 ++++++++++++++++++++++
.../iceberg/spark/source/WritersBenchmark.java | 9 +++++---
.../apache/iceberg/spark/source/BaseReader.java | 4 +++-
.../iceberg/spark/source/InternalRowWrapper.java | 26 +++++++++++++++++----
.../spark/source/SparkPartitionedFanoutWriter.java | 2 +-
.../spark/source/SparkPartitionedWriter.java | 2 +-
.../spark/source/SparkPositionDeltaWrite.java | 5 ++--
.../apache/iceberg/spark/source/SparkWrite.java | 2 +-
.../spark/source/TestInternalRowWrapper.java | 3 ++-
.../spark/source/TestSparkAppenderFactory.java | 2 +-
.../spark/source/TestSparkFileWriterFactory.java | 2 +-
.../spark/source/TestSparkPartitioningWriters.java | 2 +-
.../source/TestSparkPositionDeltaWriters.java | 2 +-
.../spark/source/TestSparkReaderDeletes.java | 3 ++-
.../apache/iceberg/spark/sql/TestCreateTable.java | 26 +++++++++++++++++++++
.../iceberg/spark/source/WritersBenchmark.java | 9 +++++---
.../apache/iceberg/spark/source/BaseReader.java | 4 +++-
.../iceberg/spark/source/InternalRowWrapper.java | 26 +++++++++++++++++----
.../spark/source/SparkPartitionedFanoutWriter.java | 2 +-
.../spark/source/SparkPartitionedWriter.java | 2 +-
.../spark/source/SparkPositionDeltaWrite.java | 5 ++--
.../apache/iceberg/spark/source/SparkWrite.java | 2 +-
.../spark/source/TestInternalRowWrapper.java | 3 ++-
.../spark/source/TestSparkAppenderFactory.java | 2 +-
.../spark/source/TestSparkFileWriterFactory.java | 2 +-
.../spark/source/TestSparkPartitioningWriters.java | 2 +-
.../source/TestSparkPositionDeltaWriters.java | 2 +-
.../spark/source/TestSparkReaderDeletes.java | 3 ++-
.../apache/iceberg/spark/sql/TestCreateTable.java | 26 +++++++++++++++++++++
43 files changed, 216 insertions(+), 59 deletions(-)
diff --git a/data/src/test/java/org/apache/iceberg/RecordWrapperTest.java
b/data/src/test/java/org/apache/iceberg/RecordWrapperTest.java
index 22b928d238..94e9825fa5 100644
--- a/data/src/test/java/org/apache/iceberg/RecordWrapperTest.java
+++ b/data/src/test/java/org/apache/iceberg/RecordWrapperTest.java
@@ -44,8 +44,8 @@ public abstract class RecordWrapperTest {
optional(113, "bytes", Types.BinaryType.get()),
required(114, "dec_9_0", Types.DecimalType.of(9, 0)),
required(115, "dec_11_2", Types.DecimalType.of(11, 2)),
- required(116, "dec_38_10", Types.DecimalType.of(38, 10)) // maximum
precision
- );
+ required(116, "dec_38_10", Types.DecimalType.of(38, 10)), // maximum
precision
+ optional(117, "uuid", Types.UUIDType.get()));
private static final Types.StructType TIMESTAMP_WITHOUT_ZONE =
Types.StructType.of(
diff --git
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
index 13ff034e4b..dd2cf80716 100644
---
a/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
+++
b/spark/v3.3/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -193,7 +193,8 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
PartitionKey partitionKey = new PartitionKey(partitionedSpec,
table().schema());
StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
- InternalRowWrapper internalRowWrapper = new
InternalRowWrapper(dataSparkType);
+ InternalRowWrapper internalRowWrapper =
+ new InternalRowWrapper(dataSparkType, table().schema().asStruct());
try (ClusteredDataWriter<InternalRow> closeableWriter = writer) {
for (InternalRow row : rows) {
@@ -256,7 +257,8 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
PartitionKey partitionKey = new PartitionKey(partitionedSpec,
table().schema());
StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
- InternalRowWrapper internalRowWrapper = new
InternalRowWrapper(dataSparkType);
+ InternalRowWrapper internalRowWrapper =
+ new InternalRowWrapper(dataSparkType, table().schema().asStruct());
try (FanoutDataWriter<InternalRow> closeableWriter = writer) {
for (InternalRow row : rows) {
@@ -324,7 +326,8 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
PartitionKey partitionKey = new PartitionKey(partitionedSpec,
table().schema());
StructType deleteSparkType = SparkSchemaUtil.convert(table().schema());
- InternalRowWrapper internalRowWrapper = new
InternalRowWrapper(deleteSparkType);
+ InternalRowWrapper internalRowWrapper =
+ new InternalRowWrapper(deleteSparkType, table().schema().asStruct());
try (ClusteredEqualityDeleteWriter<InternalRow> closeableWriter = writer) {
for (InternalRow row : rows) {
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index 4fb838202c..0882edcb7c 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -259,7 +259,9 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter
counter) {
super(filePath, deletes, tableSchema, expectedSchema, counter);
- this.asStructLike = new
InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+ this.asStructLike =
+ new InternalRowWrapper(
+ SparkSchemaUtil.convert(requiredSchema()),
requiredSchema().asStruct());
}
@Override
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
index 524266f6f8..d1682b8c85 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
@@ -19,9 +19,13 @@
package org.apache.iceberg.spark.source;
import java.nio.ByteBuffer;
+import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
@@ -40,9 +44,17 @@ class InternalRowWrapper implements StructLike {
private InternalRow row = null;
@SuppressWarnings("unchecked")
- InternalRowWrapper(StructType rowType) {
+ InternalRowWrapper(StructType rowType, Types.StructType icebergSchema) {
this.types =
Stream.of(rowType.fields()).map(StructField::dataType).toArray(DataType[]::new);
- this.getters =
Stream.of(types).map(InternalRowWrapper::getter).toArray(BiFunction[]::new);
+ Preconditions.checkArgument(
+ types.length == icebergSchema.fields().size(),
+ "Invalid length: Spark struct type (%s) != Iceberg struct type (%s)",
+ types.length,
+ icebergSchema.fields().size());
+ this.getters = new BiFunction[types.length];
+ for (int i = 0; i < types.length; i++) {
+ getters[i] = getter(icebergSchema.fields().get(i).type(), types[i]);
+ }
}
InternalRowWrapper wrap(InternalRow internalRow) {
@@ -71,8 +83,13 @@ class InternalRowWrapper implements StructLike {
row.update(pos, value);
}
- private static BiFunction<InternalRow, Integer, ?> getter(DataType type) {
+ private static BiFunction<InternalRow, Integer, ?> getter(Type icebergType,
DataType type) {
if (type instanceof StringType) {
+ // Spark represents UUIDs as strings
+ if (Type.TypeID.UUID == icebergType.typeId()) {
+ return (row, pos) ->
UUID.fromString(row.getUTF8String(pos).toString());
+ }
+
return (row, pos) -> row.getUTF8String(pos).toString();
} else if (type instanceof DecimalType) {
DecimalType decimal = (DecimalType) type;
@@ -82,7 +99,8 @@ class InternalRowWrapper implements StructLike {
return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));
} else if (type instanceof StructType) {
StructType structType = (StructType) type;
- InternalRowWrapper nestedWrapper = new InternalRowWrapper(structType);
+ InternalRowWrapper nestedWrapper =
+ new InternalRowWrapper(structType, icebergType.asStructType());
return (row, pos) -> nestedWrapper.wrap(row.getStruct(pos,
structType.size()));
}
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
index f17cd260f9..c34ad2f3ad 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedFanoutWriter extends
PartitionedFanoutWriter<Intern
StructType sparkSchema) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);
- this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+ this.internalRowWrapper = new InternalRowWrapper(sparkSchema,
schema.asStruct());
}
@Override
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
index a860916443..6904446829 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedWriter extends
PartitionedWriter<InternalRow> {
StructType sparkSchema) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);
- this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+ this.internalRowWrapper = new InternalRowWrapper(sparkSchema,
schema.asStruct());
}
@Override
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index ce4b248e0f..2d38292439 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -375,7 +375,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
protected InternalRowWrapper initPartitionRowWrapper(Types.StructType
partitionType) {
StructType sparkPartitionType = (StructType)
SparkSchemaUtil.convert(partitionType);
- return new InternalRowWrapper(sparkPartitionType);
+ return new InternalRowWrapper(sparkPartitionType, partitionType);
}
protected Map<Integer, StructProjection> buildPartitionProjections(
@@ -645,7 +645,8 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
this.dataSpec = table.spec();
this.dataPartitionKey = new PartitionKey(dataSpec, context.dataSchema());
- this.internalRowDataWrapper = new
InternalRowWrapper(context.dataSparkType());
+ this.internalRowDataWrapper =
+ new InternalRowWrapper(context.dataSparkType(),
context.dataSchema().asStruct());
}
@Override
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 59fecd45cc..c26b7f5f38 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -741,7 +741,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
this.io = io;
this.spec = spec;
this.partitionKey = new PartitionKey(spec, dataSchema);
- this.internalRowWrapper = new InternalRowWrapper(dataSparkType);
+ this.internalRowWrapper = new InternalRowWrapper(dataSparkType,
dataSchema.asStruct());
}
@Override
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
index 9e75145faf..1b4698fe5b 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
@@ -53,7 +53,8 @@ public class TestInternalRowWrapper extends RecordWrapperTest
{
Iterable<InternalRow> rowList = RandomData.generateSpark(schema,
numRecords, 101L);
InternalRecordWrapper recordWrapper = new
InternalRecordWrapper(schema.asStruct());
- InternalRowWrapper rowWrapper = new
InternalRowWrapper(SparkSchemaUtil.convert(schema));
+ InternalRowWrapper rowWrapper =
+ new InternalRowWrapper(SparkSchemaUtil.convert(schema),
schema.asStruct());
Iterator<Record> actual = recordList.iterator();
Iterator<InternalRow> expected = rowList.iterator();
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
index 1f4c613f74..0664400c79 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
@@ -56,7 +56,7 @@ public class TestSparkAppenderFactory extends
TestAppenderFactory<InternalRow> {
protected StructLikeSet expectedRowSet(Iterable<InternalRow> rows) {
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
index 8f4b3042b1..575e6658db 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
@@ -61,7 +61,7 @@ public class TestSparkFileWriterFactory extends
TestFileWriterFactory<InternalRo
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
StructType sparkType = SparkSchemaUtil.convert(table.schema());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
index 5e8107806a..979abd21e7 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPartitioningWriters extends
TestPartitioningWriters<Intern
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
StructType sparkType = SparkSchemaUtil.convert(table.schema());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
index baac1efe0e..9dc56abf9f 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPositionDeltaWriters extends
TestPositionDeltaWriters<Inte
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
StructType sparkType = SparkSchemaUtil.convert(table.schema());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 88fd3d96dd..76a4143fcb 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -315,7 +315,8 @@ public class TestSparkReaderDeletes extends DeleteReadTests
{
new EqualityDeleteRowReader(task, table, null, table.schema(),
false)) {
while (reader.next()) {
actualRowSet.add(
- new InternalRowWrapper(SparkSchemaUtil.convert(table.schema()))
+ new InternalRowWrapper(
+ SparkSchemaUtil.convert(table.schema()),
table.schema().asStruct())
.wrap(reader.get().copy()));
}
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index a6256afcdf..3a62361590 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.sql;
import java.io.File;
import java.util.Map;
+import java.util.UUID;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
@@ -33,6 +34,7 @@ import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StructType;
import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -104,6 +106,31 @@ public class TestCreateTable extends SparkCatalogTestBase {
table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
}
+ @Test
+ public void testCreateTablePartitionedByUUID() {
+ Assertions.assertThat(validationCatalog.tableExists(tableIdent)).isFalse();
+ Schema schema = new Schema(1, Types.NestedField.optional(1, "uuid",
Types.UUIDType.get()));
+ PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uuid",
16).build();
+ validationCatalog.createTable(tableIdent, schema, spec);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assertions.assertThat(table).isNotNull();
+
+ StructType expectedSchema =
+ StructType.of(Types.NestedField.optional(1, "uuid",
Types.UUIDType.get()));
+ Assertions.assertThat(table.schema().asStruct()).isEqualTo(expectedSchema);
+ Assertions.assertThat(table.spec().fields()).hasSize(1);
+
+ String uuid = UUID.randomUUID().toString();
+
+ sql("INSERT INTO %s VALUES('%s')", tableName, uuid);
+
+ Assertions.assertThat(sql("SELECT uuid FROM %s", tableName))
+ .hasSize(1)
+ .element(0)
+ .isEqualTo(row(uuid));
+ }
+
@Test
public void testCreateTableInRootNamespace() {
Assume.assumeTrue(
diff --git
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
index 71813c5a63..963159fe4e 100644
---
a/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
+++
b/spark/v3.4/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -220,7 +220,8 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
PartitionKey partitionKey = new PartitionKey(partitionedSpec,
table().schema());
StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
- InternalRowWrapper internalRowWrapper = new
InternalRowWrapper(dataSparkType);
+ InternalRowWrapper internalRowWrapper =
+ new InternalRowWrapper(dataSparkType, table().schema().asStruct());
try (ClusteredDataWriter<InternalRow> closeableWriter = writer) {
for (InternalRow row : rows) {
@@ -283,7 +284,8 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
PartitionKey partitionKey = new PartitionKey(partitionedSpec,
table().schema());
StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
- InternalRowWrapper internalRowWrapper = new
InternalRowWrapper(dataSparkType);
+ InternalRowWrapper internalRowWrapper =
+ new InternalRowWrapper(dataSparkType, table().schema().asStruct());
try (FanoutDataWriter<InternalRow> closeableWriter = writer) {
for (InternalRow row : rows) {
@@ -351,7 +353,8 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
PartitionKey partitionKey = new PartitionKey(partitionedSpec,
table().schema());
StructType deleteSparkType = SparkSchemaUtil.convert(table().schema());
- InternalRowWrapper internalRowWrapper = new
InternalRowWrapper(deleteSparkType);
+ InternalRowWrapper internalRowWrapper =
+ new InternalRowWrapper(deleteSparkType, table().schema().asStruct());
try (ClusteredEqualityDeleteWriter<InternalRow> closeableWriter = writer) {
for (InternalRow row : rows) {
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index c2b3e7c2dc..238069e1c9 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -264,7 +264,9 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter
counter) {
super(filePath, deletes, tableSchema, expectedSchema, counter);
- this.asStructLike = new
InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+ this.asStructLike =
+ new InternalRowWrapper(
+ SparkSchemaUtil.convert(requiredSchema()),
requiredSchema().asStruct());
}
@Override
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
index 524266f6f8..d1682b8c85 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
@@ -19,9 +19,13 @@
package org.apache.iceberg.spark.source;
import java.nio.ByteBuffer;
+import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
@@ -40,9 +44,17 @@ class InternalRowWrapper implements StructLike {
private InternalRow row = null;
@SuppressWarnings("unchecked")
- InternalRowWrapper(StructType rowType) {
+ InternalRowWrapper(StructType rowType, Types.StructType icebergSchema) {
this.types =
Stream.of(rowType.fields()).map(StructField::dataType).toArray(DataType[]::new);
- this.getters =
Stream.of(types).map(InternalRowWrapper::getter).toArray(BiFunction[]::new);
+ Preconditions.checkArgument(
+ types.length == icebergSchema.fields().size(),
+ "Invalid length: Spark struct type (%s) != Iceberg struct type (%s)",
+ types.length,
+ icebergSchema.fields().size());
+ this.getters = new BiFunction[types.length];
+ for (int i = 0; i < types.length; i++) {
+ getters[i] = getter(icebergSchema.fields().get(i).type(), types[i]);
+ }
}
InternalRowWrapper wrap(InternalRow internalRow) {
@@ -71,8 +83,13 @@ class InternalRowWrapper implements StructLike {
row.update(pos, value);
}
- private static BiFunction<InternalRow, Integer, ?> getter(DataType type) {
+ private static BiFunction<InternalRow, Integer, ?> getter(Type icebergType,
DataType type) {
if (type instanceof StringType) {
+ // Spark represents UUIDs as strings
+ if (Type.TypeID.UUID == icebergType.typeId()) {
+ return (row, pos) ->
UUID.fromString(row.getUTF8String(pos).toString());
+ }
+
return (row, pos) -> row.getUTF8String(pos).toString();
} else if (type instanceof DecimalType) {
DecimalType decimal = (DecimalType) type;
@@ -82,7 +99,8 @@ class InternalRowWrapper implements StructLike {
return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));
} else if (type instanceof StructType) {
StructType structType = (StructType) type;
- InternalRowWrapper nestedWrapper = new InternalRowWrapper(structType);
+ InternalRowWrapper nestedWrapper =
+ new InternalRowWrapper(structType, icebergType.asStructType());
return (row, pos) -> nestedWrapper.wrap(row.getStruct(pos,
structType.size()));
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
index f17cd260f9..c34ad2f3ad 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedFanoutWriter extends
PartitionedFanoutWriter<Intern
StructType sparkSchema) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);
- this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+ this.internalRowWrapper = new InternalRowWrapper(sparkSchema,
schema.asStruct());
}
@Override
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
index a860916443..6904446829 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedWriter extends
PartitionedWriter<InternalRow> {
StructType sparkSchema) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);
- this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+ this.internalRowWrapper = new InternalRowWrapper(sparkSchema,
schema.asStruct());
}
@Override
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 5c6243bbb0..c5fc8e0b0f 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -391,7 +391,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
protected InternalRowWrapper initPartitionRowWrapper(Types.StructType
partitionType) {
StructType sparkPartitionType = (StructType)
SparkSchemaUtil.convert(partitionType);
- return new InternalRowWrapper(sparkPartitionType);
+ return new InternalRowWrapper(sparkPartitionType, partitionType);
}
protected Map<Integer, StructProjection> buildPartitionProjections(
@@ -653,7 +653,8 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
this.dataSpec = table.spec();
this.dataPartitionKey = new PartitionKey(dataSpec, context.dataSchema());
- this.internalRowDataWrapper = new
InternalRowWrapper(context.dataSparkType());
+ this.internalRowDataWrapper =
+ new InternalRowWrapper(context.dataSparkType(),
context.dataSchema().asStruct());
}
@Override
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index fb49d0bff5..d879a1f961 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -756,7 +756,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
this.io = io;
this.spec = spec;
this.partitionKey = new PartitionKey(spec, dataSchema);
- this.internalRowWrapper = new InternalRowWrapper(dataSparkType);
+ this.internalRowWrapper = new InternalRowWrapper(dataSparkType,
dataSchema.asStruct());
}
@Override
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
index 9e75145faf..1b4698fe5b 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
@@ -53,7 +53,8 @@ public class TestInternalRowWrapper extends RecordWrapperTest
{
Iterable<InternalRow> rowList = RandomData.generateSpark(schema,
numRecords, 101L);
InternalRecordWrapper recordWrapper = new
InternalRecordWrapper(schema.asStruct());
- InternalRowWrapper rowWrapper = new
InternalRowWrapper(SparkSchemaUtil.convert(schema));
+ InternalRowWrapper rowWrapper =
+ new InternalRowWrapper(SparkSchemaUtil.convert(schema),
schema.asStruct());
Iterator<Record> actual = recordList.iterator();
Iterator<InternalRow> expected = rowList.iterator();
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
index 1f4c613f74..0664400c79 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
@@ -56,7 +56,7 @@ public class TestSparkAppenderFactory extends
TestAppenderFactory<InternalRow> {
protected StructLikeSet expectedRowSet(Iterable<InternalRow> rows) {
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
index 8f4b3042b1..575e6658db 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
@@ -61,7 +61,7 @@ public class TestSparkFileWriterFactory extends
TestFileWriterFactory<InternalRo
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
StructType sparkType = SparkSchemaUtil.convert(table.schema());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
index 5e8107806a..979abd21e7 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPartitioningWriters extends
TestPartitioningWriters<Intern
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
StructType sparkType = SparkSchemaUtil.convert(table.schema());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
index baac1efe0e..9dc56abf9f 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPositionDeltaWriters extends
TestPositionDeltaWriters<Inte
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
StructType sparkType = SparkSchemaUtil.convert(table.schema());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index b3edb0e708..4643836542 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -324,7 +324,8 @@ public class TestSparkReaderDeletes extends DeleteReadTests
{
new EqualityDeleteRowReader(task, table, null, table.schema(),
false)) {
while (reader.next()) {
actualRowSet.add(
- new InternalRowWrapper(SparkSchemaUtil.convert(table.schema()))
+ new InternalRowWrapper(
+ SparkSchemaUtil.convert(table.schema()),
table.schema().asStruct())
.wrap(reader.get().copy()));
}
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index ecfd6759b9..b4dde2f945 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.sql;
import java.io.File;
import java.util.Map;
+import java.util.UUID;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -104,6 +105,31 @@ public class TestCreateTable extends SparkCatalogTestBase {
table.properties().get(TableProperties.DEFAULT_FILE_FORMAT));
}
+ @Test
+ public void testCreateTablePartitionedByUUID() {
+ Assertions.assertThat(validationCatalog.tableExists(tableIdent)).isFalse();
+ Schema schema = new Schema(1, Types.NestedField.optional(1, "uuid",
Types.UUIDType.get()));
+ PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uuid",
16).build();
+ validationCatalog.createTable(tableIdent, schema, spec);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assertions.assertThat(table).isNotNull();
+
+ StructType expectedSchema =
+ StructType.of(Types.NestedField.optional(1, "uuid",
Types.UUIDType.get()));
+ Assertions.assertThat(table.schema().asStruct()).isEqualTo(expectedSchema);
+ Assertions.assertThat(table.spec().fields()).hasSize(1);
+
+ String uuid = UUID.randomUUID().toString();
+
+ sql("INSERT INTO %s VALUES('%s')", tableName, uuid);
+
+ Assertions.assertThat(sql("SELECT uuid FROM %s", tableName))
+ .hasSize(1)
+ .element(0)
+ .isEqualTo(row(uuid));
+ }
+
@Test
public void testCreateTableInRootNamespace() {
Assume.assumeTrue(
diff --git
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
index 71813c5a63..963159fe4e 100644
---
a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
+++
b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/WritersBenchmark.java
@@ -220,7 +220,8 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
PartitionKey partitionKey = new PartitionKey(partitionedSpec,
table().schema());
StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
- InternalRowWrapper internalRowWrapper = new
InternalRowWrapper(dataSparkType);
+ InternalRowWrapper internalRowWrapper =
+ new InternalRowWrapper(dataSparkType, table().schema().asStruct());
try (ClusteredDataWriter<InternalRow> closeableWriter = writer) {
for (InternalRow row : rows) {
@@ -283,7 +284,8 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
PartitionKey partitionKey = new PartitionKey(partitionedSpec,
table().schema());
StructType dataSparkType = SparkSchemaUtil.convert(table().schema());
- InternalRowWrapper internalRowWrapper = new
InternalRowWrapper(dataSparkType);
+ InternalRowWrapper internalRowWrapper =
+ new InternalRowWrapper(dataSparkType, table().schema().asStruct());
try (FanoutDataWriter<InternalRow> closeableWriter = writer) {
for (InternalRow row : rows) {
@@ -351,7 +353,8 @@ public abstract class WritersBenchmark extends
IcebergSourceBenchmark {
PartitionKey partitionKey = new PartitionKey(partitionedSpec,
table().schema());
StructType deleteSparkType = SparkSchemaUtil.convert(table().schema());
- InternalRowWrapper internalRowWrapper = new
InternalRowWrapper(deleteSparkType);
+ InternalRowWrapper internalRowWrapper =
+ new InternalRowWrapper(deleteSparkType, table().schema().asStruct());
try (ClusteredEqualityDeleteWriter<InternalRow> closeableWriter = writer) {
for (InternalRow row : rows) {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
index a982309ec1..3c9438480d 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java
@@ -251,7 +251,9 @@ abstract class BaseReader<T, TaskT extends ScanTask>
implements Closeable {
SparkDeleteFilter(String filePath, List<DeleteFile> deletes, DeleteCounter
counter) {
super(filePath, deletes, tableSchema, expectedSchema, counter);
- this.asStructLike = new
InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema()));
+ this.asStructLike =
+ new InternalRowWrapper(
+ SparkSchemaUtil.convert(requiredSchema()),
requiredSchema().asStruct());
}
@Override
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
index 524266f6f8..d1682b8c85 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/InternalRowWrapper.java
@@ -19,9 +19,13 @@
package org.apache.iceberg.spark.source;
import java.nio.ByteBuffer;
+import java.util.UUID;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.DataType;
@@ -40,9 +44,17 @@ class InternalRowWrapper implements StructLike {
private InternalRow row = null;
@SuppressWarnings("unchecked")
- InternalRowWrapper(StructType rowType) {
+ InternalRowWrapper(StructType rowType, Types.StructType icebergSchema) {
this.types =
Stream.of(rowType.fields()).map(StructField::dataType).toArray(DataType[]::new);
- this.getters =
Stream.of(types).map(InternalRowWrapper::getter).toArray(BiFunction[]::new);
+ Preconditions.checkArgument(
+ types.length == icebergSchema.fields().size(),
+ "Invalid length: Spark struct type (%s) != Iceberg struct type (%s)",
+ types.length,
+ icebergSchema.fields().size());
+ this.getters = new BiFunction[types.length];
+ for (int i = 0; i < types.length; i++) {
+ getters[i] = getter(icebergSchema.fields().get(i).type(), types[i]);
+ }
}
InternalRowWrapper wrap(InternalRow internalRow) {
@@ -71,8 +83,13 @@ class InternalRowWrapper implements StructLike {
row.update(pos, value);
}
- private static BiFunction<InternalRow, Integer, ?> getter(DataType type) {
+ private static BiFunction<InternalRow, Integer, ?> getter(Type icebergType,
DataType type) {
if (type instanceof StringType) {
+ // Spark represents UUIDs as strings
+ if (Type.TypeID.UUID == icebergType.typeId()) {
+ return (row, pos) ->
UUID.fromString(row.getUTF8String(pos).toString());
+ }
+
return (row, pos) -> row.getUTF8String(pos).toString();
} else if (type instanceof DecimalType) {
DecimalType decimal = (DecimalType) type;
@@ -82,7 +99,8 @@ class InternalRowWrapper implements StructLike {
return (row, pos) -> ByteBuffer.wrap(row.getBinary(pos));
} else if (type instanceof StructType) {
StructType structType = (StructType) type;
- InternalRowWrapper nestedWrapper = new InternalRowWrapper(structType);
+ InternalRowWrapper nestedWrapper =
+ new InternalRowWrapper(structType, icebergType.asStructType());
return (row, pos) -> nestedWrapper.wrap(row.getStruct(pos,
structType.size()));
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
index f17cd260f9..c34ad2f3ad 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedFanoutWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedFanoutWriter extends
PartitionedFanoutWriter<Intern
StructType sparkSchema) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);
- this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+ this.internalRowWrapper = new InternalRowWrapper(sparkSchema,
schema.asStruct());
}
@Override
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
index a860916443..6904446829 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitionedWriter.java
@@ -44,7 +44,7 @@ public class SparkPartitionedWriter extends
PartitionedWriter<InternalRow> {
StructType sparkSchema) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.partitionKey = new PartitionKey(spec, schema);
- this.internalRowWrapper = new InternalRowWrapper(sparkSchema);
+ this.internalRowWrapper = new InternalRowWrapper(sparkSchema,
schema.asStruct());
}
@Override
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 022283631f..a964f76863 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -402,7 +402,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
protected InternalRowWrapper initPartitionRowWrapper(Types.StructType
partitionType) {
StructType sparkPartitionType = (StructType)
SparkSchemaUtil.convert(partitionType);
- return new InternalRowWrapper(sparkPartitionType);
+ return new InternalRowWrapper(sparkPartitionType, partitionType);
}
protected Map<Integer, StructProjection> buildPartitionProjections(
@@ -663,7 +663,8 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
this.dataSpec = table.spec();
this.dataPartitionKey = new PartitionKey(dataSpec, context.dataSchema());
- this.internalRowDataWrapper = new
InternalRowWrapper(context.dataSparkType());
+ this.internalRowDataWrapper =
+ new InternalRowWrapper(context.dataSparkType(),
context.dataSchema().asStruct());
}
@Override
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 32f560a7bd..d23c473bb4 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -772,7 +772,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
this.io = io;
this.spec = spec;
this.partitionKey = new PartitionKey(spec, dataSchema);
- this.internalRowWrapper = new InternalRowWrapper(dataSparkType);
+ this.internalRowWrapper = new InternalRowWrapper(dataSparkType,
dataSchema.asStruct());
}
@Override
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
index a0bc982002..0c869aa8e7 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestInternalRowWrapper.java
@@ -54,7 +54,8 @@ public class TestInternalRowWrapper extends RecordWrapperTest
{
Iterable<InternalRow> rowList = RandomData.generateSpark(schema,
numRecords, 101L);
InternalRecordWrapper recordWrapper = new
InternalRecordWrapper(schema.asStruct());
- InternalRowWrapper rowWrapper = new
InternalRowWrapper(SparkSchemaUtil.convert(schema));
+ InternalRowWrapper rowWrapper =
+ new InternalRowWrapper(SparkSchemaUtil.convert(schema),
schema.asStruct());
Iterator<Record> actual = recordList.iterator();
Iterator<InternalRow> expected = rowList.iterator();
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
index 1f4c613f74..0664400c79 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAppenderFactory.java
@@ -56,7 +56,7 @@ public class TestSparkAppenderFactory extends
TestAppenderFactory<InternalRow> {
protected StructLikeSet expectedRowSet(Iterable<InternalRow> rows) {
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
index 8f4b3042b1..575e6658db 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkFileWriterFactory.java
@@ -61,7 +61,7 @@ public class TestSparkFileWriterFactory extends
TestFileWriterFactory<InternalRo
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
StructType sparkType = SparkSchemaUtil.convert(table.schema());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
index 5e8107806a..979abd21e7 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPartitioningWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPartitioningWriters extends
TestPartitioningWriters<Intern
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
StructType sparkType = SparkSchemaUtil.convert(table.schema());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
index baac1efe0e..9dc56abf9f 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkPositionDeltaWriters.java
@@ -61,7 +61,7 @@ public class TestSparkPositionDeltaWriters extends
TestPositionDeltaWriters<Inte
StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
StructType sparkType = SparkSchemaUtil.convert(table.schema());
for (InternalRow row : rows) {
- InternalRowWrapper wrapper = new InternalRowWrapper(sparkType);
+ InternalRowWrapper wrapper = new InternalRowWrapper(sparkType,
table.schema().asStruct());
set.add(wrapper.wrap(row));
}
return set;
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index b3edb0e708..4643836542 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -324,7 +324,8 @@ public class TestSparkReaderDeletes extends DeleteReadTests
{
new EqualityDeleteRowReader(task, table, null, table.schema(),
false)) {
while (reader.next()) {
actualRowSet.add(
- new InternalRowWrapper(SparkSchemaUtil.convert(table.schema()))
+ new InternalRowWrapper(
+ SparkSchemaUtil.convert(table.schema()),
table.schema().asStruct())
.wrap(reader.get().copy()));
}
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
index 718233c697..73be40e050 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTable.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.nio.file.Files;
+import java.util.UUID;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -111,6 +112,31 @@ public class TestCreateTable extends CatalogTestBase {
.isNull();
}
+ @TestTemplate
+ public void testCreateTablePartitionedByUUID() {
+ Assertions.assertThat(validationCatalog.tableExists(tableIdent)).isFalse();
+ Schema schema = new Schema(1, Types.NestedField.optional(1, "uuid",
Types.UUIDType.get()));
+ PartitionSpec spec = PartitionSpec.builderFor(schema).bucket("uuid",
16).build();
+ validationCatalog.createTable(tableIdent, schema, spec);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Assertions.assertThat(table).isNotNull();
+
+ StructType expectedSchema =
+ StructType.of(Types.NestedField.optional(1, "uuid",
Types.UUIDType.get()));
+ Assertions.assertThat(table.schema().asStruct()).isEqualTo(expectedSchema);
+ Assertions.assertThat(table.spec().fields()).hasSize(1);
+
+ String uuid = UUID.randomUUID().toString();
+
+ sql("INSERT INTO %s VALUES('%s')", tableName, uuid);
+
+ Assertions.assertThat(sql("SELECT uuid FROM %s", tableName))
+ .hasSize(1)
+ .element(0)
+ .isEqualTo(row(uuid));
+ }
+
@TestTemplate
public void testCreateTableInRootNamespace() {
assumeThat(catalogName)