This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2aefd5b9c0c Improve IcebergIO utils (#31958)
2aefd5b9c0c is described below
commit 2aefd5b9c0c8a4dcb577d367d859455e4a93ec14
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Jul 31 14:25:14 2024 -0400
Improve IcebergIO utils (#31958)
* Improve Iceberg utils
* add documentation; clarify variable name
* fix kinks, add type tests
---
.../org/apache/beam/sdk/io/iceberg/IcebergIO.java | 2 +-
...emaAndRowConversions.java => IcebergUtils.java} | 217 +++++--
.../apache/beam/sdk/io/iceberg/RecordWriter.java | 4 +-
.../org/apache/beam/sdk/io/iceberg/ScanSource.java | 2 +-
.../apache/beam/sdk/io/iceberg/ScanTaskReader.java | 4 +-
.../apache/beam/sdk/io/iceberg/IcebergIOIT.java | 5 +-
.../beam/sdk/io/iceberg/IcebergIOReadTest.java | 8 +-
.../beam/sdk/io/iceberg/IcebergIOWriteTest.java | 12 +-
.../IcebergReadSchemaTransformProviderTest.java | 8 +-
.../beam/sdk/io/iceberg/IcebergUtilsTest.java | 676 +++++++++++++++++++++
.../IcebergWriteSchemaTransformProviderTest.java | 5 +-
.../apache/beam/sdk/io/iceberg/ScanSourceTest.java | 6 +-
.../io/iceberg/SchemaAndRowConversionsTest.java | 268 --------
.../apache/beam/sdk/io/iceberg/TestFixtures.java | 2 +-
14 files changed, 865 insertions(+), 354 deletions(-)
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index 50e0ea8b63d..c3c1da7c788 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -137,7 +137,7 @@ public class IcebergIO {
.setCatalogConfig(getCatalogConfig())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(tableId)
-
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema()))
+
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
.build())));
}
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
similarity index 50%
rename from
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java
rename to
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
index e1a8685614f..a2f84e6475c 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
@@ -20,36 +20,42 @@ package org.apache.beam.sdk.io.iceberg;
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
-class SchemaAndRowConversions {
+/** Utilities for converting between Beam and Iceberg types. */
+public class IcebergUtils {
+ // This is made public for users convenience, as many may have more
experience working with
+ // Iceberg types.
- private SchemaAndRowConversions() {}
+ private IcebergUtils() {}
- static final Map<Schema.FieldType, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
- ImmutableMap.<Schema.FieldType, Type>builder()
- .put(Schema.FieldType.BOOLEAN, Types.BooleanType.get())
- .put(Schema.FieldType.INT32, Types.IntegerType.get())
- .put(Schema.FieldType.INT64, Types.LongType.get())
- .put(Schema.FieldType.FLOAT, Types.FloatType.get())
- .put(Schema.FieldType.DOUBLE, Types.DoubleType.get())
- .put(Schema.FieldType.STRING, Types.StringType.get())
- .put(Schema.FieldType.BYTES, Types.BinaryType.get())
+ private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
+ ImmutableMap.<Schema.TypeName, Type>builder()
+ .put(Schema.TypeName.BOOLEAN, Types.BooleanType.get())
+ .put(Schema.TypeName.INT32, Types.IntegerType.get())
+ .put(Schema.TypeName.INT64, Types.LongType.get())
+ .put(Schema.TypeName.FLOAT, Types.FloatType.get())
+ .put(Schema.TypeName.DOUBLE, Types.DoubleType.get())
+ .put(Schema.TypeName.STRING, Types.StringType.get())
+ .put(Schema.TypeName.BYTES, Types.BinaryType.get())
.build();
- public static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
+ private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
switch (type.typeId()) {
case BOOLEAN:
return Schema.FieldType.BOOLEAN;
@@ -86,11 +92,12 @@ class SchemaAndRowConversions {
throw new RuntimeException("Unrecognized IcebergIO Type");
}
- public static Schema.Field icebergFieldToBeamField(final Types.NestedField
field) {
+ private static Schema.Field icebergFieldToBeamField(final Types.NestedField
field) {
return Schema.Field.of(field.name(),
icebergTypeToBeamFieldType(field.type()))
.withNullable(field.isOptional());
}
+ /** Converts an Iceberg {@link org.apache.iceberg.Schema} to a Beam {@link
Schema}. */
public static Schema icebergSchemaToBeamSchema(final
org.apache.iceberg.Schema schema) {
Schema.Builder builder = Schema.builder();
for (Types.NestedField f : schema.columns()) {
@@ -99,7 +106,7 @@ class SchemaAndRowConversions {
return builder.build();
}
- public static Schema icebergStructTypeToBeamSchema(final Types.StructType
struct) {
+ private static Schema icebergStructTypeToBeamSchema(final Types.StructType
struct) {
Schema.Builder builder = Schema.builder();
for (Types.NestedField f : struct.fields()) {
builder.addField(icebergFieldToBeamField(f));
@@ -107,28 +114,141 @@ class SchemaAndRowConversions {
return builder.build();
}
- public static Types.NestedField beamFieldToIcebergField(int fieldId, final
Schema.Field field) {
- @Nullable Type icebergType =
BEAM_TYPES_TO_ICEBERG_TYPES.get(field.getType());
+ /**
+ * Represents an Object (in practice, either {@link Type} or {@link
Types.NestedField}) along with
+ * the most recent (max) ID that has been used to build this object.
+ *
+ * <p>Iceberg Schema fields are required to have unique IDs. This includes
unique IDs for a {@link
+ * Types.ListType}'s collection type, a {@link Types.MapType}'s key type and
value type, and
+ * nested {@link Types.StructType}s. When constructing any of these types,
we use multiple unique
+ * ID's for the type's components. The {@code maxId} in this object
represents the most recent ID
+ * used after building this type. This helps signal that the next field we
construct should have
+ * an ID greater than this one.
+ */
+ @VisibleForTesting
+ static class ObjectAndMaxId<T> {
+ int maxId;
+ T object;
- if (icebergType != null) {
- return Types.NestedField.of(
- fieldId, field.getType().getNullable(), field.getName(),
icebergType);
- } else {
- return Types.NestedField.of(
- fieldId, field.getType().getNullable(), field.getName(),
Types.StringType.get());
+ ObjectAndMaxId(int id, T object) {
+ this.maxId = id;
+ this.object = object;
}
}
+ /**
+ * Given a Beam {@link Schema.FieldType} and an index, returns an Iceberg
{@link Type} and the
+ * maximum index after building the Iceberg Type. This assumes the input
index is already in use
+ * (usually by the parent {@link Types.NestedField}, and will start building
the Iceberg type from
+ * index + 1.
+ *
+ * <p>Returns this information in an {@link ObjectAndMaxId<Type>} instance.
+ */
+ @VisibleForTesting
+ static ObjectAndMaxId<Type> beamFieldTypeToIcebergFieldType(
+ int fieldId, Schema.FieldType beamType) {
+ if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
+ return new ObjectAndMaxId<>(fieldId,
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
+ } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or
ITERABLE
+ // List ID needs to be unique from the NestedField that contains this
ListType
+ int listId = fieldId + 1;
+ Schema.FieldType beamCollectionType =
+
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
+
+ ObjectAndMaxId<Type> listInfo = beamFieldTypeToIcebergFieldType(listId,
beamCollectionType);
+ Type icebergCollectionType = listInfo.object;
+
+ boolean elementTypeIsNullable =
+
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable();
+
+ Type listType =
+ elementTypeIsNullable
+ ? Types.ListType.ofOptional(listId, icebergCollectionType)
+ : Types.ListType.ofRequired(listId, icebergCollectionType);
+
+ return new ObjectAndMaxId<>(listInfo.maxId, listType);
+ } else if (beamType.getTypeName().isMapType()) { // MAP
+ // key and value IDs need to be unique from the NestedField that
contains this MapType
+ int keyId = fieldId + 1;
+ int valueId = fieldId + 2;
+ int maxId = valueId;
+
+ Schema.FieldType beamKeyType =
Preconditions.checkArgumentNotNull(beamType.getMapKeyType());
+ ObjectAndMaxId<Type> keyInfo = beamFieldTypeToIcebergFieldType(maxId,
beamKeyType);
+ Type icebergKeyType = keyInfo.object;
+ maxId = keyInfo.maxId;
+
+ Schema.FieldType beamValueType =
+ Preconditions.checkArgumentNotNull(beamType.getMapValueType());
+ ObjectAndMaxId<Type> valueInfo = beamFieldTypeToIcebergFieldType(maxId,
beamValueType);
+ Type icebergValueType = valueInfo.object;
+ maxId = valueInfo.maxId;
+
+ Type mapType =
+ beamValueType.getNullable()
+ ? Types.MapType.ofOptional(keyId, valueId, icebergKeyType,
icebergValueType)
+ : Types.MapType.ofRequired(keyId, valueId, icebergKeyType,
icebergValueType);
+
+ return new ObjectAndMaxId<>(maxId, mapType);
+ } else if (beamType.getTypeName().isCompositeType()) { // ROW
+ // Nested field IDs need to be unique from the field that contains this
StructType
+ int maxFieldId = fieldId;
+
+ Schema nestedSchema =
Preconditions.checkArgumentNotNull(beamType.getRowSchema());
+ List<Types.NestedField> nestedFields = new
ArrayList<>(nestedSchema.getFieldCount());
+ for (Schema.Field field : nestedSchema.getFields()) {
+ ObjectAndMaxId<Types.NestedField> converted =
beamFieldToIcebergField(++maxFieldId, field);
+ Types.NestedField nestedField = converted.object;
+ nestedFields.add(nestedField);
+
+ maxFieldId = converted.maxId;
+ }
+
+ Type structType = Types.StructType.of(nestedFields);
+
+ return new ObjectAndMaxId<>(maxFieldId, structType);
+ }
+
+ return new ObjectAndMaxId<>(fieldId, Types.StringType.get());
+ }
+
+ private static ObjectAndMaxId<Types.NestedField> beamFieldToIcebergField(
+ int fieldId, final Schema.Field field) {
+ ObjectAndMaxId<Type> typeAndMaxId =
beamFieldTypeToIcebergFieldType(fieldId, field.getType());
+ Type icebergType = typeAndMaxId.object;
+ int id = typeAndMaxId.maxId;
+
+ Types.NestedField icebergField =
+ Types.NestedField.of(fieldId, field.getType().getNullable(),
field.getName(), icebergType);
+
+ return new ObjectAndMaxId<>(id, icebergField);
+ }
+
+ /**
+ * Converts a Beam {@link Schema} to an Iceberg {@link
org.apache.iceberg.Schema}.
+ *
+ * <p>The following unsupported Beam types will be defaulted to {@link
Types.StringType}:
+ * <li>{@link Schema.TypeName.DECIMAL}
+ * <li>{@link Schema.TypeName.DATETIME}
+ * <li>{@link Schema.TypeName.LOGICAL_TYPE}
+ */
public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final
Schema schema) {
Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()];
- int fieldId = 0;
- for (Schema.Field f : schema.getFields()) {
- fields[fieldId++] = beamFieldToIcebergField(fieldId, f);
+ int nextIcebergFieldId = 1;
+ for (int i = 0; i < schema.getFieldCount(); i++) {
+ Schema.Field beamField = schema.getField(i);
+ ObjectAndMaxId<Types.NestedField> fieldAndMaxId =
+ beamFieldToIcebergField(nextIcebergFieldId, beamField);
+ Types.NestedField field = fieldAndMaxId.object;
+ fields[i] = field;
+
+ nextIcebergFieldId = fieldAndMaxId.maxId + 1;
}
return new org.apache.iceberg.Schema(fields);
}
- public static Record rowToRecord(org.apache.iceberg.Schema schema, Row row) {
+ /** Converts a Beam {@link Row} to an Iceberg {@link Record}. */
+ public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema
schema, Row row) {
return copyRowIntoRecord(GenericRecord.create(schema), row);
}
@@ -191,13 +311,16 @@ class SchemaAndRowConversions {
copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row)));
break;
case LIST:
- throw new UnsupportedOperationException("List fields are not yet
supported.");
+ Optional.ofNullable(value.getArray(name)).ifPresent(list ->
rec.setField(name, list));
+ break;
case MAP:
- throw new UnsupportedOperationException("Map fields are not yet
supported.");
+ Optional.ofNullable(value.getMap(name)).ifPresent(v ->
rec.setField(name, v));
+ break;
}
}
- public static Row recordToRow(Schema schema, Record record) {
+ /** Converts an Iceberg {@link Record} to a Beam {@link Row}. */
+ public static Row icebergRecordToBeamRow(Schema schema, Record record) {
Row.Builder rowBuilder = Row.withSchema(schema);
for (Schema.Field field : schema.getFields()) {
switch (field.getType().getTypeName()) {
@@ -221,20 +344,14 @@ class SchemaAndRowConversions {
long longValue = (long) record.getField(field.getName());
rowBuilder.addValue(longValue);
break;
- case DECIMAL:
- // Iceberg and Beam both use BigDecimal
- rowBuilder.addValue(record.getField(field.getName()));
- break;
- case FLOAT:
- // Iceberg and Beam both use float
- rowBuilder.addValue(record.getField(field.getName()));
- break;
- case DOUBLE:
- // Iceberg and Beam both use double
- rowBuilder.addValue(record.getField(field.getName()));
- break;
- case STRING:
- // Iceberg and Beam both use String
+ case DECIMAL: // Iceberg and Beam both use BigDecimal
+ case FLOAT: // Iceberg and Beam both use float
+ case DOUBLE: // Iceberg and Beam both use double
+ case STRING: // Iceberg and Beam both use String
+ case BOOLEAN: // Iceberg and Beam both use String
+ case ARRAY:
+ case ITERABLE:
+ case MAP:
rowBuilder.addValue(record.getField(field.getName()));
break;
case DATETIME:
@@ -242,27 +359,17 @@ class SchemaAndRowConversions {
long millis = (long) record.getField(field.getName());
rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC));
break;
- case BOOLEAN:
- // Iceberg and Beam both use String
- rowBuilder.addValue(record.getField(field.getName()));
- break;
case BYTES:
// Iceberg uses ByteBuffer; Beam uses byte[]
rowBuilder.addValue(((ByteBuffer)
record.getField(field.getName())).array());
break;
- case ARRAY:
- throw new UnsupportedOperationException("Array fields are not yet
supported.");
- case ITERABLE:
- throw new UnsupportedOperationException("Iterable fields are not yet
supported.");
- case MAP:
- throw new UnsupportedOperationException("Map fields are not yet
supported.");
case ROW:
Record nestedRecord = (Record) record.getField(field.getName());
Schema nestedSchema =
checkArgumentNotNull(
field.getType().getRowSchema(),
"Corrupted schema: Row type did not have associated nested
schema.");
- Row nestedRow = recordToRow(nestedSchema, nestedRecord);
+ Row nestedRow = icebergRecordToBeamRow(nestedSchema, nestedRecord);
rowBuilder.addValue(nestedRow);
break;
case LOGICAL_TYPE:
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
index 859310bdcec..d7212783d1b 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.io.iceberg;
-import static
org.apache.beam.sdk.io.iceberg.SchemaAndRowConversions.rowToRecord;
+import static
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord;
import java.io.IOException;
import org.apache.beam.sdk.values.Row;
@@ -80,7 +80,7 @@ class RecordWriter {
}
public void write(Row row) {
- Record record = rowToRecord(table.schema(), row);
+ Record record = beamRowToIcebergRecord(table.schema(), row);
icebergDataWriter.write(record);
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java
index ed2f2eda767..ff2aa083348 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java
@@ -50,7 +50,7 @@ class ScanSource extends BoundedSource<Row> {
scanConfig
.getTable()
.newScan()
-
.project(SchemaAndRowConversions.beamSchemaToIcebergSchema(scanConfig.getSchema()));
+
.project(IcebergUtils.beamSchemaToIcebergSchema(scanConfig.getSchema()));
if (scanConfig.getFilter() != null) {
tableScan = tableScan.filter(scanConfig.getFilter());
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
index 52e6d60c1fb..b7cb42b2eac 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
@@ -60,7 +60,7 @@ class ScanTaskReader extends BoundedSource.BoundedReader<Row>
{
public ScanTaskReader(ScanTaskSource source) {
this.source = source;
- this.project =
SchemaAndRowConversions.beamSchemaToIcebergSchema(source.getSchema());
+ this.project = IcebergUtils.beamSchemaToIcebergSchema(source.getSchema());
}
@Override
@@ -160,7 +160,7 @@ class ScanTaskReader extends
BoundedSource.BoundedReader<Row> {
if (current == null) {
throw new NoSuchElementException();
}
- return SchemaAndRowConversions.recordToRow(source.getSchema(), current);
+ return IcebergUtils.icebergRecordToBeamRow(source.getSchema(), current);
}
@Override
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
index 1c5686bfde9..3a169eeb40d 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
@@ -134,8 +134,7 @@ public class IcebergIOIT implements Serializable {
.addByteArrayField("bytes")
.build();
- static final Schema ICEBERG_SCHEMA =
- SchemaAndRowConversions.beamSchemaToIcebergSchema(BEAM_SCHEMA);
+ static final Schema ICEBERG_SCHEMA =
IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
Map<String, Object> getValues(int num) {
String strNum = Integer.toString(num);
@@ -238,7 +237,7 @@ public class IcebergIOIT implements Serializable {
List<Row> inputRows =
inputRecords.stream()
- .map(record -> SchemaAndRowConversions.recordToRow(BEAM_SCHEMA,
record))
+ .map(record -> IcebergUtils.icebergRecordToBeamRow(BEAM_SCHEMA,
record))
.collect(Collectors.toList());
// Write with Beam
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
index d6db3f68911..3f31073b444 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
@@ -70,7 +70,7 @@ public class IcebergIOReadTest {
TableIdentifier tableId =
TableIdentifier.of("default", "table" +
Long.toString(UUID.randomUUID().hashCode(), 16));
Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
- final Schema schema =
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
+ final Schema schema =
IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
simpleTable
.newFastAppend()
@@ -91,7 +91,7 @@ public class IcebergIOReadTest {
TestFixtures.FILE2SNAPSHOT1,
TestFixtures.FILE3SNAPSHOT1)
.flatMap(List::stream)
- .map(record -> SchemaAndRowConversions.recordToRow(schema, record))
+ .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record))
.collect(Collectors.toList());
Properties props = new Properties();
@@ -105,9 +105,7 @@ public class IcebergIOReadTest {
testPipeline
.apply(IcebergIO.readRows(catalogConfig).from(tableId))
.apply(ParDo.of(new PrintRow()))
- .setCoder(
- RowCoder.of(
-
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)));
+
.setCoder(RowCoder.of(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)));
PAssert.that(output)
.satisfies(
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
index e0a584ec9da..02213c45e07 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.io.iceberg;
-import static
org.apache.beam.sdk.io.iceberg.SchemaAndRowConversions.rowToRecord;
+import static
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord;
import static org.hamcrest.MatcherAssert.assertThat;
import java.io.Serializable;
@@ -85,7 +85,7 @@ public class IcebergIOWriteTest implements Serializable {
testPipeline
.apply("Records To Add",
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
-
.setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
.apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId));
LOG.info("Executing pipeline");
@@ -152,7 +152,7 @@ public class IcebergIOWriteTest implements Serializable {
TestFixtures.FILE1SNAPSHOT1,
TestFixtures.FILE1SNAPSHOT2,
TestFixtures.FILE1SNAPSHOT3))))
-
.setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
.apply("Append To Table",
IcebergIO.writeRows(catalog).to(dynamicDestinations));
LOG.info("Executing pipeline");
@@ -235,7 +235,7 @@ public class IcebergIOWriteTest implements Serializable {
testPipeline
.apply("Records To Add", Create.of(TestFixtures.asRows(elements)))
-
.setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
.apply("Append To Table",
IcebergIO.writeRows(catalog).to(dynamicDestinations));
LOG.info("Executing pipeline");
@@ -262,9 +262,9 @@ public class IcebergIOWriteTest implements Serializable {
// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
Record record =
- rowToRecord(
+ beamRowToIcebergRecord(
table.schema(),
-
Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+
Row.withSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
.addValues(42L, "bizzle")
.build());
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
index bc15021fa2b..effb5cc4838 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
@@ -73,7 +73,7 @@ public class IcebergReadSchemaTransformProviderTest {
TableIdentifier tableId = TableIdentifier.parse(identifier);
Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
- final Schema schema =
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
+ final Schema schema =
IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
simpleTable
.newFastAppend()
@@ -94,7 +94,7 @@ public class IcebergReadSchemaTransformProviderTest {
TestFixtures.FILE2SNAPSHOT1,
TestFixtures.FILE3SNAPSHOT1)
.flatMap(List::stream)
- .map(record -> SchemaAndRowConversions.recordToRow(schema, record))
+ .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record))
.collect(Collectors.toList());
Map<String, String> properties = new HashMap<>();
@@ -129,7 +129,7 @@ public class IcebergReadSchemaTransformProviderTest {
TableIdentifier tableId = TableIdentifier.parse(identifier);
Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
- final Schema schema =
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
+ final Schema schema =
IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
simpleTable
.newFastAppend()
@@ -150,7 +150,7 @@ public class IcebergReadSchemaTransformProviderTest {
TestFixtures.FILE2SNAPSHOT1,
TestFixtures.FILE3SNAPSHOT1)
.flatMap(List::stream)
- .map(record -> SchemaAndRowConversions.recordToRow(schema, record))
+ .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record))
.collect(Collectors.toList());
String yamlConfig =
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
new file mode 100644
index 00000000000..c4da0b22f4d
--- /dev/null
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
@@ -0,0 +1,676 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.io.iceberg.IcebergUtils.ObjectAndMaxId;
+import static
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(Enclosed.class)
+public class IcebergUtilsTest {
+
+ @RunWith(JUnit4.class)
+ public static class RowToRecordTests {
+ /**
+ * Checks a value that when converted to Iceberg type is the same value
when interpreted in
+ * Java.
+ */
+ private void checkRowValueToRecordValue(
+ Schema.FieldType sourceType, Type destType, Object value) {
+ checkRowValueToRecordValue(sourceType, value, destType, value);
+ }
+
+ private void checkRowValueToRecordValue(
+ Schema.FieldType sourceType, Object sourceValue, Type destType, Object
destValue) {
+ Schema beamSchema = Schema.of(Schema.Field.of("v", sourceType));
+ Row row = Row.withSchema(beamSchema).addValues(sourceValue).build();
+
+ org.apache.iceberg.Schema icebergSchema =
+ new org.apache.iceberg.Schema(required(0, "v", destType));
+ Record record = IcebergUtils.beamRowToIcebergRecord(icebergSchema, row);
+
+ assertThat(record.getField("v"), equalTo(destValue));
+ }
+
+ @Test
+ public void testBoolean() {
+ checkRowValueToRecordValue(Schema.FieldType.BOOLEAN,
Types.BooleanType.get(), true);
+ checkRowValueToRecordValue(Schema.FieldType.BOOLEAN,
Types.BooleanType.get(), false);
+ }
+
+ @Test
+ public void testInteger() {
+ checkRowValueToRecordValue(Schema.FieldType.INT32,
Types.IntegerType.get(), -13);
+ checkRowValueToRecordValue(Schema.FieldType.INT32,
Types.IntegerType.get(), 42);
+ checkRowValueToRecordValue(Schema.FieldType.INT32,
Types.IntegerType.get(), 0);
+ }
+
+ @Test
+ public void testLong() {
+ checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(),
13L);
+ checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(),
42L);
+ }
+
+ @Test
+ public void testFloat() {
+ checkRowValueToRecordValue(Schema.FieldType.FLOAT,
Types.FloatType.get(), 3.14159f);
+ checkRowValueToRecordValue(Schema.FieldType.FLOAT,
Types.FloatType.get(), 42.0f);
+ }
+
+ @Test
+ public void testDouble() {
+ checkRowValueToRecordValue(Schema.FieldType.DOUBLE,
Types.DoubleType.get(), 3.14159);
+ }
+
+ @Test
+ public void testDate() {}
+
+ @Test
+ public void testTime() {}
+
+ @Test
+ public void testTimestamp() {
+ DateTime dateTime =
+ new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3,
4).withZone(DateTimeZone.UTC);
+
+ checkRowValueToRecordValue(
+ Schema.FieldType.DATETIME,
+ dateTime.toInstant(),
+ Types.TimestampType.withoutZone(),
+ dateTime.getMillis());
+ }
+
+ @Test
+ public void testFixed() {}
+
+ @Test
+ public void testBinary() {
+ byte[] bytes = new byte[] {1, 2, 3, 4};
+ checkRowValueToRecordValue(
+ Schema.FieldType.BYTES, bytes, Types.BinaryType.get(),
ByteBuffer.wrap(bytes));
+ }
+
+ @Test
+ public void testDecimal() {
+ BigDecimal num = BigDecimal.valueOf(123.456);
+
+ checkRowValueToRecordValue(Schema.FieldType.DECIMAL,
Types.DecimalType.of(6, 3), num);
+ }
+
+ @Test
+ public void testStruct() {
+ Schema schema = Schema.builder().addStringField("nested_str").build();
+ Row beamRow = Row.withSchema(schema).addValue("str_value").build();
+
+ Types.NestedField nestedFieldType = required(1, "nested_str",
Types.StringType.get());
+ GenericRecord icebergRow =
+ GenericRecord.create(new org.apache.iceberg.Schema(nestedFieldType));
+ icebergRow.setField("nested_str", "str_value");
+
+ checkRowValueToRecordValue(
+ Schema.FieldType.row(schema), beamRow,
Types.StructType.of(nestedFieldType), icebergRow);
+ }
+
+ @Test
+ public void testMap() {
+ Map<String, Integer> map =
+ ImmutableMap.<String, Integer>builder().put("a", 123).put("b",
456).put("c", 789).build();
+
+ checkRowValueToRecordValue(
+ Schema.FieldType.map(Schema.FieldType.STRING,
Schema.FieldType.INT32),
+ Types.MapType.ofRequired(1, 2, Types.StringType.get(),
Types.IntegerType.get()),
+ map);
+ }
+
+ @Test
+ public void testList() {
+ List<String> list = Arrays.asList("abc", "xyz", "123", "foo", "bar");
+
+ checkRowValueToRecordValue(
+ Schema.FieldType.array(Schema.FieldType.STRING),
+ Types.ListType.ofRequired(1, Types.StringType.get()),
+ list);
+ }
+ }
+
+ @RunWith(JUnit4.class)
+ public static class RecordToRowTests {
+ private void checkRecordValueToRowValue(
+ Type sourceType, Schema.FieldType destType, Object value) {
+ checkRecordValueToRowValue(sourceType, value, destType, value);
+ }
+
+ private void checkRecordValueToRowValue(
+ Type sourceType, Object sourceValue, Schema.FieldType destType, Object
destValue) {
+ Schema beamSchema = Schema.of(Schema.Field.of("v", destType));
+
+ org.apache.iceberg.Schema icebergSchema =
+ new org.apache.iceberg.Schema(required(0, "v", sourceType));
+ Record record = GenericRecord.create(icebergSchema);
+ record.setField("v", sourceValue);
+
+ Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record);
+
+ assertThat(row.getBaseValue("v"), equalTo(destValue));
+ }
+
+ @Test
+ public void testBoolean() {
+ checkRecordValueToRowValue(Types.BooleanType.get(),
Schema.FieldType.BOOLEAN, true);
+ checkRecordValueToRowValue(Types.BooleanType.get(),
Schema.FieldType.BOOLEAN, false);
+ }
+
+ @Test
+ public void testInteger() {
+ checkRecordValueToRowValue(Types.IntegerType.get(),
Schema.FieldType.INT32, -13);
+ checkRecordValueToRowValue(Types.IntegerType.get(),
Schema.FieldType.INT32, 42);
+ checkRecordValueToRowValue(Types.IntegerType.get(),
Schema.FieldType.INT32, 0);
+ }
+
+ @Test
+ public void testLong() {
+ checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64,
13L);
+ checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64,
42L);
+ }
+
+ @Test
+ public void testFloat() {
+ checkRecordValueToRowValue(Types.FloatType.get(),
Schema.FieldType.FLOAT, 3.14159f);
+ checkRecordValueToRowValue(Types.FloatType.get(),
Schema.FieldType.FLOAT, 42.0f);
+ }
+
+ @Test
+ public void testDouble() {
+ checkRecordValueToRowValue(Types.DoubleType.get(),
Schema.FieldType.DOUBLE, 3.14159);
+ }
+
+ @Test
+ public void testDate() {}
+
+ @Test
+ public void testTime() {}
+
+ @Test
+ public void testTimestamp() {
+ DateTime dateTime =
+ new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3,
4).withZone(DateTimeZone.UTC);
+
+ checkRecordValueToRowValue(
+ Types.TimestampType.withoutZone(),
+ dateTime.getMillis(),
+ Schema.FieldType.DATETIME,
+ dateTime.toInstant());
+ }
+
+ @Test
+ public void testFixed() {}
+
+ @Test
+ public void testBinary() {
+ byte[] bytes = new byte[] {1, 2, 3, 4};
+ checkRecordValueToRowValue(
+ Types.BinaryType.get(), ByteBuffer.wrap(bytes),
Schema.FieldType.BYTES, bytes);
+ }
+
+ @Test
+ public void testDecimal() {
+ BigDecimal num = BigDecimal.valueOf(123.456);
+
+ checkRecordValueToRowValue(Types.DecimalType.of(6, 3),
Schema.FieldType.DECIMAL, num);
+ }
+
+ @Test
+ public void testStruct() {
+ Schema schema = Schema.builder().addStringField("nested_str").build();
+ Row beamRow = Row.withSchema(schema).addValue("str_value").build();
+
+ Types.NestedField nestedFieldType = required(1, "nested_str",
Types.StringType.get());
+ GenericRecord icebergRow =
+ GenericRecord.create(new org.apache.iceberg.Schema(nestedFieldType));
+ icebergRow.setField("nested_str", "str_value");
+
+ checkRecordValueToRowValue(
+ Types.StructType.of(nestedFieldType), icebergRow,
Schema.FieldType.row(schema), beamRow);
+ }
+
+ @Test
+ public void testMap() {
+ Map<String, Integer> map =
+ ImmutableMap.<String, Integer>builder().put("a", 123).put("b",
456).put("c", 789).build();
+
+ checkRecordValueToRowValue(
+ Types.MapType.ofRequired(1, 2, Types.StringType.get(),
Types.IntegerType.get()),
+ Schema.FieldType.map(Schema.FieldType.STRING,
Schema.FieldType.INT32),
+ map);
+ }
+
+ @Test
+ public void testList() {
+ List<String> list = Arrays.asList("abc", "xyz", "123", "foo", "bar");
+
+ checkRecordValueToRowValue(
+ Types.ListType.ofRequired(1, Types.StringType.get()),
+ Schema.FieldType.iterable(Schema.FieldType.STRING),
+ list);
+ }
+ }
+
+ @RunWith(JUnit4.class)
+ public static class SchemaTests {
+ private static class BeamFieldTypeTestCase {
+ final int icebergFieldId;
+ final Schema.FieldType beamType;
+ final int expectedMaxId;
+ final Type expectedIcebergType;
+
+ BeamFieldTypeTestCase(
+ int icebergFieldId,
+ Schema.FieldType beamType,
+ int expectedMaxId,
+ Type expectedIcebergType) {
+ this.icebergFieldId = icebergFieldId;
+ this.beamType = beamType;
+ this.expectedMaxId = expectedMaxId;
+ this.expectedIcebergType = expectedIcebergType;
+ }
+ }
+
+ private void checkTypes(List<BeamFieldTypeTestCase> testCases) {
+ for (BeamFieldTypeTestCase testCase : testCases) {
+ ObjectAndMaxId<Type> ret =
+ beamFieldTypeToIcebergFieldType(testCase.icebergFieldId,
testCase.beamType);
+
+ assertEquals(testCase.expectedMaxId, ret.maxId);
+ checkEquals(testCase.expectedIcebergType, ret.object);
+ }
+ }
+
+ private void checkEquals(Type expected, Type actual) {
+ if (expected.isListType() && actual.isListType()) {
+ Type nestedExpected = expected.asListType().elementType();
+ Type nestedActual = actual.asListType().elementType();
+
+ assertEquals(nestedExpected.typeId(), nestedActual.typeId());
+ checkEquals(nestedExpected, nestedActual);
+ } else {
+ assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void testPrimitiveBeamFieldTypeToIcebergFieldType() {
+ List<BeamFieldTypeTestCase> primitives =
+ Arrays.asList(
+ new BeamFieldTypeTestCase(1, Schema.FieldType.BOOLEAN, 1,
Types.BooleanType.get()),
+ new BeamFieldTypeTestCase(3, Schema.FieldType.INT32, 3,
Types.IntegerType.get()),
+ new BeamFieldTypeTestCase(6, Schema.FieldType.INT64, 6,
Types.LongType.get()),
+ new BeamFieldTypeTestCase(10, Schema.FieldType.FLOAT, 10,
Types.FloatType.get()),
+ new BeamFieldTypeTestCase(7, Schema.FieldType.DOUBLE, 7,
Types.DoubleType.get()),
+ new BeamFieldTypeTestCase(11, Schema.FieldType.STRING, 11,
Types.StringType.get()),
+ new BeamFieldTypeTestCase(15, Schema.FieldType.BYTES, 15,
Types.BinaryType.get()));
+
+ checkTypes(primitives);
+ }
+
+ @Test
+ public void testArrayBeamFieldTypeToIcebergFieldType() {
+ // Iceberg sets one field ID for the List type itself and another field
ID for the collection
+ // type.
+ List<BeamFieldTypeTestCase> listTypes =
+ Arrays.asList(
+ new BeamFieldTypeTestCase(
+ 1,
+ Schema.FieldType.array(Schema.FieldType.BOOLEAN),
+ 2,
+ Types.ListType.ofRequired(1, Types.BooleanType.get())),
+ new BeamFieldTypeTestCase(
+ 3,
+ Schema.FieldType.iterable(Schema.FieldType.INT32),
+ 4,
+ Types.ListType.ofRequired(3, Types.IntegerType.get())),
+ new BeamFieldTypeTestCase(
+ 6,
+ Schema.FieldType.array(Schema.FieldType.INT64),
+ 7,
+ Types.ListType.ofRequired(6, Types.LongType.get())),
+ new BeamFieldTypeTestCase(
+ 10,
+ Schema.FieldType.array(Schema.FieldType.FLOAT),
+ 11,
+ Types.ListType.ofRequired(10, Types.FloatType.get())),
+ new BeamFieldTypeTestCase(
+ 7,
+ Schema.FieldType.iterable(Schema.FieldType.DOUBLE),
+ 8,
+ Types.ListType.ofRequired(7, Types.DoubleType.get())),
+ new BeamFieldTypeTestCase(
+ 11,
+ Schema.FieldType.array(Schema.FieldType.STRING),
+ 12,
+ Types.ListType.ofRequired(11, Types.StringType.get())),
+ new BeamFieldTypeTestCase(
+ 15,
+ Schema.FieldType.iterable(Schema.FieldType.BYTES),
+ 16,
+ Types.ListType.ofRequired(15, Types.BinaryType.get())),
+ new BeamFieldTypeTestCase(
+ 23,
+ Schema.FieldType.array(
+
Schema.FieldType.array(Schema.FieldType.iterable(Schema.FieldType.STRING))),
+ 26,
+ Types.ListType.ofRequired(
+ 23,
+ Types.ListType.ofRequired(
+ 24, Types.ListType.ofRequired(25,
Types.StringType.get())))));
+
+ checkTypes(listTypes);
+ }
+
+ @Test
+ public void testStructBeamFieldTypeToIcebergFieldType() {
+ // Iceberg sets one field ID for each nested type.
+ List<BeamFieldTypeTestCase> listTypes =
+ Arrays.asList(
+ new BeamFieldTypeTestCase(
+ 1,
+
Schema.FieldType.row(Schema.builder().addStringField("str").build()),
+ 2,
+ Types.StructType.of(
+ Types.NestedField.required(2, "str",
Types.StringType.get()))),
+ new BeamFieldTypeTestCase(
+ 3,
+
Schema.FieldType.row(Schema.builder().addInt32Field("int").build()),
+ 4,
+ Types.StructType.of(
+ Types.NestedField.required(4, "int",
Types.IntegerType.get()))),
+ new BeamFieldTypeTestCase(
+ 0,
+ Schema.FieldType.row(BEAM_SCHEMA_PRIMITIVE),
+ 7,
+ Types.StructType.of(ICEBERG_SCHEMA_PRIMITIVE.columns())),
+ new BeamFieldTypeTestCase(
+ 15,
+ Schema.FieldType.row(
+ Schema.builder()
+ .addArrayField("arr", Schema.FieldType.STRING)
+ .addNullableStringField("str")
+ .build()),
+ 18,
+ Types.StructType.of(
+ Types.NestedField.required(
+ 16, "arr", Types.ListType.ofRequired(17,
Types.StringType.get())),
+ Types.NestedField.optional(18, "str",
Types.StringType.get()))),
+ new BeamFieldTypeTestCase(
+ 20,
+ Schema.FieldType.row(
+ Schema.builder()
+ .addRowField(
+ "row",
+ Schema.builder()
+ .addRowField(
+ "nested_row",
Schema.builder().addStringField("str").build())
+ .build())
+ .addNullableRowField(
+ "nullable_row",
Schema.builder().addInt64Field("long").build())
+ .build()),
+ 25,
+ Types.StructType.of(
+ Types.NestedField.required(
+ 21,
+ "row",
+ Types.StructType.of(
+ Types.NestedField.required(
+ 22,
+ "nested_row",
+ Types.StructType.of(
+ Types.NestedField.required(
+ 23, "str",
Types.StringType.get()))))),
+ Types.NestedField.optional(
+ 24,
+ "nullable_row",
+ Types.StructType.of(
+ Types.NestedField.required(25, "long",
Types.LongType.get()))))));
+
+ checkTypes(listTypes);
+ }
+
+ @Test
+ public void testMapBeamFieldTypeToIcebergFieldType() {
+ List<BeamFieldTypeTestCase> primitives =
+ Arrays.asList(
+ new BeamFieldTypeTestCase(
+ 1,
+ Schema.FieldType.map(Schema.FieldType.STRING,
Schema.FieldType.INT32),
+ 3,
+ Types.MapType.ofRequired(2, 3, Types.StringType.get(),
Types.IntegerType.get())),
+ new BeamFieldTypeTestCase(
+ 6,
+ Schema.FieldType.map(
+ Schema.FieldType.FLOAT,
Schema.FieldType.array(Schema.FieldType.STRING)),
+ 9,
+ Types.MapType.ofRequired(
+ 7,
+ 8,
+ Types.FloatType.get(),
+ Types.ListType.ofRequired(9, Types.StringType.get()))),
+ new BeamFieldTypeTestCase(
+ 10,
+ Schema.FieldType.map(
+ Schema.FieldType.STRING,
+ Schema.FieldType.map(
+ Schema.FieldType.BOOLEAN,
+ Schema.FieldType.map(Schema.FieldType.STRING,
Schema.FieldType.INT32))),
+ 16,
+ Types.MapType.ofRequired(
+ 11,
+ 12,
+ Types.StringType.get(),
+ Types.MapType.ofRequired(
+ 13,
+ 14,
+ Types.BooleanType.get(),
+ Types.MapType.ofRequired(
+ 15, 16, Types.StringType.get(),
Types.IntegerType.get())))),
+ new BeamFieldTypeTestCase(
+ 15,
+ Schema.FieldType.map(
+
Schema.FieldType.row(Schema.builder().addStringField("str").build()),
+
Schema.FieldType.row(Schema.builder().addInt32Field("int").build())),
+ 19,
+ Types.MapType.ofRequired(
+ 16,
+ 17,
+ Types.StructType.of(
+ Types.NestedField.required(18, "str",
Types.StringType.get())),
+ Types.StructType.of(
+ Types.NestedField.required(19, "int",
Types.IntegerType.get())))));
+
+ checkTypes(primitives);
+ }
+
+ static final Schema BEAM_SCHEMA_PRIMITIVE =
+ Schema.builder()
+ .addInt32Field("int")
+ .addFloatField("float")
+ .addNullableDoubleField("double")
+ .addInt64Field("long")
+ .addNullableStringField("str")
+ .addNullableBooleanField("bool")
+ .addByteArrayField("bytes")
+ .build();
+
+ static final org.apache.iceberg.Schema ICEBERG_SCHEMA_PRIMITIVE =
+ new org.apache.iceberg.Schema(
+ required(1, "int", Types.IntegerType.get()),
+ required(2, "float", Types.FloatType.get()),
+ optional(3, "double", Types.DoubleType.get()),
+ required(4, "long", Types.LongType.get()),
+ optional(5, "str", Types.StringType.get()),
+ optional(6, "bool", Types.BooleanType.get()),
+ required(7, "bytes", Types.BinaryType.get()));
+
+ @Test
+ public void testPrimitiveBeamSchemaToIcebergSchema() {
+ org.apache.iceberg.Schema convertedIcebergSchema =
+ IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_PRIMITIVE);
+
+ System.out.println(convertedIcebergSchema);
+ System.out.println(ICEBERG_SCHEMA_PRIMITIVE);
+
+ assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_PRIMITIVE));
+ }
+
+ @Test
+ public void testPrimitiveIcebergSchemaToBeamSchema() {
+ Schema convertedBeamSchema =
IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_PRIMITIVE);
+
+ assertEquals(BEAM_SCHEMA_PRIMITIVE, convertedBeamSchema);
+ }
+
+ static final Schema BEAM_SCHEMA_LIST =
+ Schema.builder()
+ .addIterableField("arr_str", Schema.FieldType.STRING)
+ .addIterableField("arr_int", Schema.FieldType.INT32)
+ .addIterableField("arr_bool", Schema.FieldType.BOOLEAN)
+ .build();
+ static final org.apache.iceberg.Schema ICEBERG_SCHEMA_LIST =
+ new org.apache.iceberg.Schema(
+ required(1, "arr_str", Types.ListType.ofRequired(2,
Types.StringType.get())),
+ required(3, "arr_int", Types.ListType.ofRequired(4,
Types.IntegerType.get())),
+ required(5, "arr_bool", Types.ListType.ofRequired(6,
Types.BooleanType.get())));
+
+ @Test
+ public void testArrayBeamSchemaToIcebergSchema() {
+ org.apache.iceberg.Schema convertedIcebergSchema =
+ IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_LIST);
+
+ assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_LIST));
+ }
+
+ @Test
+ public void testArrayIcebergSchemaToBeamSchema() {
+ Schema convertedBeamSchema =
IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_LIST);
+
+ System.out.println(convertedBeamSchema);
+ System.out.println(BEAM_SCHEMA_LIST);
+
+ assertEquals(BEAM_SCHEMA_LIST, convertedBeamSchema);
+ }
+
+ static final Schema BEAM_SCHEMA_MAP =
+ Schema.builder()
+ .addMapField("str_int", Schema.FieldType.STRING,
Schema.FieldType.INT32)
+ .addNullableMapField("long_bool", Schema.FieldType.INT64,
Schema.FieldType.BOOLEAN)
+ .build();
+
+ static final org.apache.iceberg.Schema ICEBERG_SCHEMA_MAP =
+ new org.apache.iceberg.Schema(
+ required(
+ 1,
+ "str_int",
+ Types.MapType.ofRequired(2, 3, Types.StringType.get(),
Types.IntegerType.get())),
+ optional(
+ 4,
+ "long_bool",
+ Types.MapType.ofRequired(5, 6, Types.LongType.get(),
Types.BooleanType.get())));
+
+ @Test
+ public void testMapBeamSchemaToIcebergSchema() {
+ org.apache.iceberg.Schema convertedIcebergSchema =
+ IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_MAP);
+
+ assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_MAP));
+ }
+
+ @Test
+ public void testMapIcebergSchemaToBeamSchema() {
+ Schema convertedBeamSchema =
IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_MAP);
+
+ assertEquals(BEAM_SCHEMA_MAP, convertedBeamSchema);
+ }
+
+ static final Schema BEAM_SCHEMA_STRUCT =
+ Schema.builder()
+ .addRowField(
+ "row",
+ Schema.builder()
+ .addStringField("str")
+ .addNullableInt32Field("int")
+ .addInt64Field("long")
+ .build())
+ .addNullableRowField(
+ "nullable_row",
+
Schema.builder().addNullableStringField("str").addBooleanField("bool").build())
+ .build();
+
+ static final org.apache.iceberg.Schema ICEBERG_SCHEMA_STRUCT =
+ new org.apache.iceberg.Schema(
+ required(
+ 1,
+ "row",
+ Types.StructType.of(
+ required(2, "str", Types.StringType.get()),
+ optional(3, "int", Types.IntegerType.get()),
+ required(4, "long", Types.LongType.get()))),
+ optional(
+ 5,
+ "nullable_row",
+ Types.StructType.of(
+ optional(6, "str", Types.StringType.get()),
+ required(7, "bool", Types.BooleanType.get()))));
+
+ @Test
+ public void testStructBeamSchemaToIcebergSchema() {
+ org.apache.iceberg.Schema convertedIcebergSchema =
+ IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_STRUCT);
+
+ assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_STRUCT));
+ }
+
+ @Test
+ public void testStructIcebergSchemaToBeamSchema() {
+ Schema convertedBeamSchema =
IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_STRUCT);
+
+ assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema);
+ }
+ }
+}
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
index 75884f4bcf7..a2cd64e2395 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
@@ -102,8 +102,7 @@ public class IcebergWriteSchemaTransformProviderTest {
testPipeline
.apply(
"Records To Add",
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
- .setRowSchema(
-
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)));
+
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)));
PCollection<Row> result =
input
@@ -137,7 +136,7 @@ public class IcebergWriteSchemaTransformProviderTest {
PCollection<Row> inputRows =
testPipeline
.apply("Records To Add",
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
-
.setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA));
+
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA));
PCollection<Row> result =
inputRows.apply(Managed.write(Managed.ICEBERG).withConfig(configMap)).get(OUTPUT_TAG);
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
index 143687e3c99..007cb028c66 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
@@ -79,7 +79,7 @@ public class ScanSourceTest {
.build())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(simpleTable.name().replace("hadoop.",
"").split("\\."))
-
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
.build());
BoundedSource.BoundedReader<Row> reader = source.createReader(options);
@@ -121,7 +121,7 @@ public class ScanSourceTest {
.build())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(simpleTable.name().replace("hadoop.",
"").split("\\."))
-
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
.build());
// Input data for this test is tiny so try a number of very small split
sizes
@@ -167,7 +167,7 @@ public class ScanSourceTest {
.build())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(simpleTable.name().replace("hadoop.",
"").split("\\."))
-
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
.build());
// Input data for this test is tiny so make sure to split and get a few,
but so they can be
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversionsTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversionsTest.java
deleted file mode 100644
index 5c708700a17..00000000000
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversionsTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.iceberg;
-
-import static org.apache.iceberg.types.Types.NestedField.required;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.values.Row;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.Types;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(Enclosed.class)
-public class SchemaAndRowConversionsTest {
-
- @RunWith(JUnit4.class)
- public static class RowToRecordTests {
- /**
- * Checks a value that when converted to Iceberg type is the same value
when interpreted in
- * Java.
- */
- private void checkRowValueToRecordValue(
- Schema.FieldType sourceType, Type destType, Object value) {
- checkRowValueToRecordValue(sourceType, value, destType, value);
- }
-
- private void checkRowValueToRecordValue(
- Schema.FieldType sourceType, Object sourceValue, Type destType, Object
destValue) {
- Schema beamSchema = Schema.of(Schema.Field.of("v", sourceType));
- Row row = Row.withSchema(beamSchema).addValues(sourceValue).build();
-
- org.apache.iceberg.Schema icebergSchema =
- new org.apache.iceberg.Schema(required(0, "v", destType));
- Record record = SchemaAndRowConversions.rowToRecord(icebergSchema, row);
-
- assertThat(record.getField("v"), equalTo(destValue));
- }
-
- @Test
- public void testBoolean() throws Exception {
- checkRowValueToRecordValue(Schema.FieldType.BOOLEAN,
Types.BooleanType.get(), true);
- checkRowValueToRecordValue(Schema.FieldType.BOOLEAN,
Types.BooleanType.get(), false);
- }
-
- @Test
- public void testInteger() throws Exception {
- checkRowValueToRecordValue(Schema.FieldType.INT32,
Types.IntegerType.get(), -13);
- checkRowValueToRecordValue(Schema.FieldType.INT32,
Types.IntegerType.get(), 42);
- checkRowValueToRecordValue(Schema.FieldType.INT32,
Types.IntegerType.get(), 0);
- }
-
- @Test
- public void testLong() throws Exception {
- checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(),
13L);
- checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(),
42L);
- }
-
- @Test
- public void testFloat() throws Exception {
- checkRowValueToRecordValue(Schema.FieldType.FLOAT,
Types.FloatType.get(), 3.14159f);
- checkRowValueToRecordValue(Schema.FieldType.FLOAT,
Types.FloatType.get(), 42.0f);
- }
-
- @Test
- public void testDouble() throws Exception {
- checkRowValueToRecordValue(Schema.FieldType.DOUBLE,
Types.DoubleType.get(), 3.14159);
- }
-
- @Test
- public void testDate() throws Exception {}
-
- @Test
- public void testTime() throws Exception {}
-
- @Test
- public void testTimestamp() throws Exception {
- DateTime dateTime =
- new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3,
4).withZone(DateTimeZone.UTC);
-
- checkRowValueToRecordValue(
- Schema.FieldType.DATETIME,
- dateTime.toInstant(),
- Types.TimestampType.withoutZone(),
- dateTime.getMillis());
- }
-
- @Test
- public void testFixed() throws Exception {}
-
- @Test
- public void testBinary() throws Exception {
- byte[] bytes = new byte[] {1, 2, 3, 4};
- checkRowValueToRecordValue(
- Schema.FieldType.BYTES, bytes, Types.BinaryType.get(),
ByteBuffer.wrap(bytes));
- }
-
- @Test
- public void testDecimal() throws Exception {}
-
- @Test
- public void testStruct() throws Exception {}
-
- @Test
- public void testMap() throws Exception {}
-
- @Test
- public void testList() throws Exception {}
- }
-
- @RunWith(JUnit4.class)
- public static class RecordToRowTests {
- private void checkRecordValueToRowValue(
- Type sourceType, Schema.FieldType destType, Object value) {
- checkRecordValueToRowValue(sourceType, value, destType, value);
- }
-
- private void checkRecordValueToRowValue(
- Type sourceType, Object sourceValue, Schema.FieldType destType, Object
destValue) {
- Schema beamSchema = Schema.of(Schema.Field.of("v", destType));
-
- org.apache.iceberg.Schema icebergSchema =
- new org.apache.iceberg.Schema(required(0, "v", sourceType));
- Record record = GenericRecord.create(icebergSchema);
- record.setField("v", sourceValue);
-
- Row row = SchemaAndRowConversions.recordToRow(beamSchema, record);
-
- assertThat(row.getBaseValue("v"), equalTo(destValue));
- }
-
- @Test
- public void testBoolean() throws Exception {
- checkRecordValueToRowValue(Types.BooleanType.get(),
Schema.FieldType.BOOLEAN, true);
- checkRecordValueToRowValue(Types.BooleanType.get(),
Schema.FieldType.BOOLEAN, false);
- }
-
- @Test
- public void testInteger() throws Exception {
- checkRecordValueToRowValue(Types.IntegerType.get(),
Schema.FieldType.INT32, -13);
- checkRecordValueToRowValue(Types.IntegerType.get(),
Schema.FieldType.INT32, 42);
- checkRecordValueToRowValue(Types.IntegerType.get(),
Schema.FieldType.INT32, 0);
- }
-
- @Test
- public void testLong() throws Exception {
- checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64,
13L);
- checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64,
42L);
- }
-
- @Test
- public void testFloat() throws Exception {
- checkRecordValueToRowValue(Types.FloatType.get(),
Schema.FieldType.FLOAT, 3.14159f);
- checkRecordValueToRowValue(Types.FloatType.get(),
Schema.FieldType.FLOAT, 42.0f);
- }
-
- @Test
- public void testDouble() throws Exception {
- checkRecordValueToRowValue(Types.DoubleType.get(),
Schema.FieldType.DOUBLE, 3.14159);
- }
-
- @Test
- public void testDate() throws Exception {}
-
- @Test
- public void testTime() throws Exception {}
-
- @Test
- public void testTimestamp() throws Exception {
- DateTime dateTime =
- new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3,
4).withZone(DateTimeZone.UTC);
-
- checkRecordValueToRowValue(
- Types.TimestampType.withoutZone(),
- dateTime.getMillis(),
- Schema.FieldType.DATETIME,
- dateTime.toInstant());
- }
-
- @Test
- public void testFixed() throws Exception {}
-
- @Test
- public void testBinary() throws Exception {
- byte[] bytes = new byte[] {1, 2, 3, 4};
- checkRecordValueToRowValue(
- Types.BinaryType.get(), ByteBuffer.wrap(bytes),
Schema.FieldType.BYTES, bytes);
- }
-
- @Test
- public void testDecimal() throws Exception {}
-
- @Test
- public void testStruct() throws Exception {}
-
- @Test
- public void testMap() throws Exception {}
-
- @Test
- public void testList() throws Exception {}
- }
-
- @RunWith(JUnit4.class)
- public static class SchemaTests {
- static final Schema BEAM_SCHEMA =
- Schema.builder()
- .addInt32Field("int")
- .addFloatField("float")
- .addDoubleField("double")
- .addInt64Field("long")
- .addStringField("str")
- .addBooleanField("bool")
- .addByteArrayField("bytes")
- .build();
-
- static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
- new org.apache.iceberg.Schema(
- Types.NestedField.required(1, "int", Types.IntegerType.get()),
- Types.NestedField.required(2, "float", Types.FloatType.get()),
- Types.NestedField.required(3, "double", Types.DoubleType.get()),
- Types.NestedField.required(4, "long", Types.LongType.get()),
- Types.NestedField.required(5, "str", Types.StringType.get()),
- Types.NestedField.required(6, "bool", Types.BooleanType.get()),
- Types.NestedField.required(7, "bytes", Types.BinaryType.get()));
-
- @Test
- public void testBeamSchemaToIcebergSchema() {
- org.apache.iceberg.Schema convertedIcebergSchema =
- SchemaAndRowConversions.beamSchemaToIcebergSchema(BEAM_SCHEMA);
-
- assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA));
- }
-
- @Test
- public void testIcebergSchemaToBeamSchema() {
- Schema convertedBeamSchema =
- SchemaAndRowConversions.icebergSchemaToBeamSchema(ICEBERG_SCHEMA);
-
- assertEquals(BEAM_SCHEMA, convertedBeamSchema);
- }
- }
-}
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java
index 4048e88398a..6143bd03491 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java
@@ -91,7 +91,7 @@ public class TestFixtures {
ArrayList<Row> rows = new ArrayList<>();
for (Record record : records) {
rows.add(
-
Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(SCHEMA))
+ Row.withSchema(IcebergUtils.icebergSchemaToBeamSchema(SCHEMA))
.withFieldValue("id", record.getField("id"))
.withFieldValue("data", record.getField("data"))
.build());