This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2aefd5b9c0c Improve IcebergIO utils (#31958)
2aefd5b9c0c is described below

commit 2aefd5b9c0c8a4dcb577d367d859455e4a93ec14
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Jul 31 14:25:14 2024 -0400

    Improve IcebergIO utils (#31958)
    
    * Improve Iceberg utils
    
    * add documentation; clarify variable name
    
    * fix kinks, add type tests
---
 .../org/apache/beam/sdk/io/iceberg/IcebergIO.java  |   2 +-
 ...emaAndRowConversions.java => IcebergUtils.java} | 217 +++++--
 .../apache/beam/sdk/io/iceberg/RecordWriter.java   |   4 +-
 .../org/apache/beam/sdk/io/iceberg/ScanSource.java |   2 +-
 .../apache/beam/sdk/io/iceberg/ScanTaskReader.java |   4 +-
 .../apache/beam/sdk/io/iceberg/IcebergIOIT.java    |   5 +-
 .../beam/sdk/io/iceberg/IcebergIOReadTest.java     |   8 +-
 .../beam/sdk/io/iceberg/IcebergIOWriteTest.java    |  12 +-
 .../IcebergReadSchemaTransformProviderTest.java    |   8 +-
 .../beam/sdk/io/iceberg/IcebergUtilsTest.java      | 676 +++++++++++++++++++++
 .../IcebergWriteSchemaTransformProviderTest.java   |   5 +-
 .../apache/beam/sdk/io/iceberg/ScanSourceTest.java |   6 +-
 .../io/iceberg/SchemaAndRowConversionsTest.java    | 268 --------
 .../apache/beam/sdk/io/iceberg/TestFixtures.java   |   2 +-
 14 files changed, 865 insertions(+), 354 deletions(-)

diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index 50e0ea8b63d..c3c1da7c788 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -137,7 +137,7 @@ public class IcebergIO {
                       .setCatalogConfig(getCatalogConfig())
                       .setScanType(IcebergScanConfig.ScanType.TABLE)
                       .setTableIdentifier(tableId)
-                      
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema()))
+                      
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(table.schema()))
                       .build())));
     }
   }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
similarity index 50%
rename from 
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java
rename to 
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
index e1a8685614f..a2f84e6475c 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
@@ -20,36 +20,42 @@ package org.apache.beam.sdk.io.iceberg;
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
-import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
-class SchemaAndRowConversions {
+/** Utilities for converting between Beam and Iceberg types. */
+public class IcebergUtils {
+  // This is made public for users convenience, as many may have more 
experience working with
+  // Iceberg types.
 
-  private SchemaAndRowConversions() {}
+  private IcebergUtils() {}
 
-  static final Map<Schema.FieldType, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
-      ImmutableMap.<Schema.FieldType, Type>builder()
-          .put(Schema.FieldType.BOOLEAN, Types.BooleanType.get())
-          .put(Schema.FieldType.INT32, Types.IntegerType.get())
-          .put(Schema.FieldType.INT64, Types.LongType.get())
-          .put(Schema.FieldType.FLOAT, Types.FloatType.get())
-          .put(Schema.FieldType.DOUBLE, Types.DoubleType.get())
-          .put(Schema.FieldType.STRING, Types.StringType.get())
-          .put(Schema.FieldType.BYTES, Types.BinaryType.get())
+  private static final Map<Schema.TypeName, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
+      ImmutableMap.<Schema.TypeName, Type>builder()
+          .put(Schema.TypeName.BOOLEAN, Types.BooleanType.get())
+          .put(Schema.TypeName.INT32, Types.IntegerType.get())
+          .put(Schema.TypeName.INT64, Types.LongType.get())
+          .put(Schema.TypeName.FLOAT, Types.FloatType.get())
+          .put(Schema.TypeName.DOUBLE, Types.DoubleType.get())
+          .put(Schema.TypeName.STRING, Types.StringType.get())
+          .put(Schema.TypeName.BYTES, Types.BinaryType.get())
           .build();
 
-  public static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
+  private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
     switch (type.typeId()) {
       case BOOLEAN:
         return Schema.FieldType.BOOLEAN;
@@ -86,11 +92,12 @@ class SchemaAndRowConversions {
     throw new RuntimeException("Unrecognized IcebergIO Type");
   }
 
-  public static Schema.Field icebergFieldToBeamField(final Types.NestedField 
field) {
+  private static Schema.Field icebergFieldToBeamField(final Types.NestedField 
field) {
     return Schema.Field.of(field.name(), 
icebergTypeToBeamFieldType(field.type()))
         .withNullable(field.isOptional());
   }
 
+  /** Converts an Iceberg {@link org.apache.iceberg.Schema} to a Beam {@link 
Schema}. */
   public static Schema icebergSchemaToBeamSchema(final 
org.apache.iceberg.Schema schema) {
     Schema.Builder builder = Schema.builder();
     for (Types.NestedField f : schema.columns()) {
@@ -99,7 +106,7 @@ class SchemaAndRowConversions {
     return builder.build();
   }
 
-  public static Schema icebergStructTypeToBeamSchema(final Types.StructType 
struct) {
+  private static Schema icebergStructTypeToBeamSchema(final Types.StructType 
struct) {
     Schema.Builder builder = Schema.builder();
     for (Types.NestedField f : struct.fields()) {
       builder.addField(icebergFieldToBeamField(f));
@@ -107,28 +114,141 @@ class SchemaAndRowConversions {
     return builder.build();
   }
 
-  public static Types.NestedField beamFieldToIcebergField(int fieldId, final 
Schema.Field field) {
-    @Nullable Type icebergType = 
BEAM_TYPES_TO_ICEBERG_TYPES.get(field.getType());
+  /**
+   * Represents an Object (in practice, either {@link Type} or {@link 
Types.NestedField}) along with
+   * the most recent (max) ID that has been used to build this object.
+   *
+   * <p>Iceberg Schema fields are required to have unique IDs. This includes 
unique IDs for a {@link
+   * Types.ListType}'s collection type, a {@link Types.MapType}'s key type and 
value type, and
+   * nested {@link Types.StructType}s. When constructing any of these types, 
we use multiple unique
+   * ID's for the type's components. The {@code maxId} in this object 
represents the most recent ID
+   * used after building this type. This helps signal that the next field we 
construct should have
+   * an ID greater than this one.
+   */
+  @VisibleForTesting
+  static class ObjectAndMaxId<T> {
+    int maxId;
+    T object;
 
-    if (icebergType != null) {
-      return Types.NestedField.of(
-          fieldId, field.getType().getNullable(), field.getName(), 
icebergType);
-    } else {
-      return Types.NestedField.of(
-          fieldId, field.getType().getNullable(), field.getName(), 
Types.StringType.get());
+    ObjectAndMaxId(int id, T object) {
+      this.maxId = id;
+      this.object = object;
     }
   }
 
+  /**
+   * Given a Beam {@link Schema.FieldType} and an index, returns an Iceberg 
{@link Type} and the
+   * maximum index after building the Iceberg Type. This assumes the input 
index is already in use
+   * (usually by the parent {@link Types.NestedField}, and will start building 
the Iceberg type from
+   * index + 1.
+   *
+   * <p>Returns this information in an {@link ObjectAndMaxId<Type>} instance.
+   */
+  @VisibleForTesting
+  static ObjectAndMaxId<Type> beamFieldTypeToIcebergFieldType(
+      int fieldId, Schema.FieldType beamType) {
+    if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
+      return new ObjectAndMaxId<>(fieldId, 
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
+    } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or 
ITERABLE
+      // List ID needs to be unique from the NestedField that contains this 
ListType
+      int listId = fieldId + 1;
+      Schema.FieldType beamCollectionType =
+          
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
+
+      ObjectAndMaxId<Type> listInfo = beamFieldTypeToIcebergFieldType(listId, 
beamCollectionType);
+      Type icebergCollectionType = listInfo.object;
+
+      boolean elementTypeIsNullable =
+          
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable();
+
+      Type listType =
+          elementTypeIsNullable
+              ? Types.ListType.ofOptional(listId, icebergCollectionType)
+              : Types.ListType.ofRequired(listId, icebergCollectionType);
+
+      return new ObjectAndMaxId<>(listInfo.maxId, listType);
+    } else if (beamType.getTypeName().isMapType()) { // MAP
+      // key and value IDs need to be unique from the NestedField that 
contains this MapType
+      int keyId = fieldId + 1;
+      int valueId = fieldId + 2;
+      int maxId = valueId;
+
+      Schema.FieldType beamKeyType = 
Preconditions.checkArgumentNotNull(beamType.getMapKeyType());
+      ObjectAndMaxId<Type> keyInfo = beamFieldTypeToIcebergFieldType(maxId, 
beamKeyType);
+      Type icebergKeyType = keyInfo.object;
+      maxId = keyInfo.maxId;
+
+      Schema.FieldType beamValueType =
+          Preconditions.checkArgumentNotNull(beamType.getMapValueType());
+      ObjectAndMaxId<Type> valueInfo = beamFieldTypeToIcebergFieldType(maxId, 
beamValueType);
+      Type icebergValueType = valueInfo.object;
+      maxId = valueInfo.maxId;
+
+      Type mapType =
+          beamValueType.getNullable()
+              ? Types.MapType.ofOptional(keyId, valueId, icebergKeyType, 
icebergValueType)
+              : Types.MapType.ofRequired(keyId, valueId, icebergKeyType, 
icebergValueType);
+
+      return new ObjectAndMaxId<>(maxId, mapType);
+    } else if (beamType.getTypeName().isCompositeType()) { // ROW
+      // Nested field IDs need to be unique from the field that contains this 
StructType
+      int maxFieldId = fieldId;
+
+      Schema nestedSchema = 
Preconditions.checkArgumentNotNull(beamType.getRowSchema());
+      List<Types.NestedField> nestedFields = new 
ArrayList<>(nestedSchema.getFieldCount());
+      for (Schema.Field field : nestedSchema.getFields()) {
+        ObjectAndMaxId<Types.NestedField> converted = 
beamFieldToIcebergField(++maxFieldId, field);
+        Types.NestedField nestedField = converted.object;
+        nestedFields.add(nestedField);
+
+        maxFieldId = converted.maxId;
+      }
+
+      Type structType = Types.StructType.of(nestedFields);
+
+      return new ObjectAndMaxId<>(maxFieldId, structType);
+    }
+
+    return new ObjectAndMaxId<>(fieldId, Types.StringType.get());
+  }
+
+  private static ObjectAndMaxId<Types.NestedField> beamFieldToIcebergField(
+      int fieldId, final Schema.Field field) {
+    ObjectAndMaxId<Type> typeAndMaxId = 
beamFieldTypeToIcebergFieldType(fieldId, field.getType());
+    Type icebergType = typeAndMaxId.object;
+    int id = typeAndMaxId.maxId;
+
+    Types.NestedField icebergField =
+        Types.NestedField.of(fieldId, field.getType().getNullable(), 
field.getName(), icebergType);
+
+    return new ObjectAndMaxId<>(id, icebergField);
+  }
+
+  /**
+   * Converts a Beam {@link Schema} to an Iceberg {@link 
org.apache.iceberg.Schema}.
+   *
+   * <p>The following unsupported Beam types will be defaulted to {@link 
Types.StringType}:
+   * <li>{@link Schema.TypeName.DECIMAL}
+   * <li>{@link Schema.TypeName.DATETIME}
+   * <li>{@link Schema.TypeName.LOGICAL_TYPE}
+   */
   public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final 
Schema schema) {
     Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()];
-    int fieldId = 0;
-    for (Schema.Field f : schema.getFields()) {
-      fields[fieldId++] = beamFieldToIcebergField(fieldId, f);
+    int nextIcebergFieldId = 1;
+    for (int i = 0; i < schema.getFieldCount(); i++) {
+      Schema.Field beamField = schema.getField(i);
+      ObjectAndMaxId<Types.NestedField> fieldAndMaxId =
+          beamFieldToIcebergField(nextIcebergFieldId, beamField);
+      Types.NestedField field = fieldAndMaxId.object;
+      fields[i] = field;
+
+      nextIcebergFieldId = fieldAndMaxId.maxId + 1;
     }
     return new org.apache.iceberg.Schema(fields);
   }
 
-  public static Record rowToRecord(org.apache.iceberg.Schema schema, Row row) {
+  /** Converts a Beam {@link Row} to an Iceberg {@link Record}. */
+  public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema 
schema, Row row) {
     return copyRowIntoRecord(GenericRecord.create(schema), row);
   }
 
@@ -191,13 +311,16 @@ class SchemaAndRowConversions {
                         
copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row)));
         break;
       case LIST:
-        throw new UnsupportedOperationException("List fields are not yet 
supported.");
+        Optional.ofNullable(value.getArray(name)).ifPresent(list -> 
rec.setField(name, list));
+        break;
       case MAP:
-        throw new UnsupportedOperationException("Map fields are not yet 
supported.");
+        Optional.ofNullable(value.getMap(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
     }
   }
 
-  public static Row recordToRow(Schema schema, Record record) {
+  /** Converts an Iceberg {@link Record} to a Beam {@link Row}. */
+  public static Row icebergRecordToBeamRow(Schema schema, Record record) {
     Row.Builder rowBuilder = Row.withSchema(schema);
     for (Schema.Field field : schema.getFields()) {
       switch (field.getType().getTypeName()) {
@@ -221,20 +344,14 @@ class SchemaAndRowConversions {
           long longValue = (long) record.getField(field.getName());
           rowBuilder.addValue(longValue);
           break;
-        case DECIMAL:
-          // Iceberg and Beam both use BigDecimal
-          rowBuilder.addValue(record.getField(field.getName()));
-          break;
-        case FLOAT:
-          // Iceberg and Beam both use float
-          rowBuilder.addValue(record.getField(field.getName()));
-          break;
-        case DOUBLE:
-          // Iceberg and Beam both use double
-          rowBuilder.addValue(record.getField(field.getName()));
-          break;
-        case STRING:
-          // Iceberg and Beam both use String
+        case DECIMAL: // Iceberg and Beam both use BigDecimal
+        case FLOAT: // Iceberg and Beam both use float
+        case DOUBLE: // Iceberg and Beam both use double
+        case STRING: // Iceberg and Beam both use String
+        case BOOLEAN: // Iceberg and Beam both use String
+        case ARRAY:
+        case ITERABLE:
+        case MAP:
           rowBuilder.addValue(record.getField(field.getName()));
           break;
         case DATETIME:
@@ -242,27 +359,17 @@ class SchemaAndRowConversions {
           long millis = (long) record.getField(field.getName());
           rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC));
           break;
-        case BOOLEAN:
-          // Iceberg and Beam both use String
-          rowBuilder.addValue(record.getField(field.getName()));
-          break;
         case BYTES:
           // Iceberg uses ByteBuffer; Beam uses byte[]
           rowBuilder.addValue(((ByteBuffer) 
record.getField(field.getName())).array());
           break;
-        case ARRAY:
-          throw new UnsupportedOperationException("Array fields are not yet 
supported.");
-        case ITERABLE:
-          throw new UnsupportedOperationException("Iterable fields are not yet 
supported.");
-        case MAP:
-          throw new UnsupportedOperationException("Map fields are not yet 
supported.");
         case ROW:
           Record nestedRecord = (Record) record.getField(field.getName());
           Schema nestedSchema =
               checkArgumentNotNull(
                   field.getType().getRowSchema(),
                   "Corrupted schema: Row type did not have associated nested 
schema.");
-          Row nestedRow = recordToRow(nestedSchema, nestedRecord);
+          Row nestedRow = icebergRecordToBeamRow(nestedSchema, nestedRecord);
           rowBuilder.addValue(nestedRow);
           break;
         case LOGICAL_TYPE:
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
index 859310bdcec..d7212783d1b 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
-import static 
org.apache.beam.sdk.io.iceberg.SchemaAndRowConversions.rowToRecord;
+import static 
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord;
 
 import java.io.IOException;
 import org.apache.beam.sdk.values.Row;
@@ -80,7 +80,7 @@ class RecordWriter {
   }
 
   public void write(Row row) {
-    Record record = rowToRecord(table.schema(), row);
+    Record record = beamRowToIcebergRecord(table.schema(), row);
     icebergDataWriter.write(record);
   }
 
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java
index ed2f2eda767..ff2aa083348 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanSource.java
@@ -50,7 +50,7 @@ class ScanSource extends BoundedSource<Row> {
         scanConfig
             .getTable()
             .newScan()
-            
.project(SchemaAndRowConversions.beamSchemaToIcebergSchema(scanConfig.getSchema()));
+            
.project(IcebergUtils.beamSchemaToIcebergSchema(scanConfig.getSchema()));
 
     if (scanConfig.getFilter() != null) {
       tableScan = tableScan.filter(scanConfig.getFilter());
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
index 52e6d60c1fb..b7cb42b2eac 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java
@@ -60,7 +60,7 @@ class ScanTaskReader extends BoundedSource.BoundedReader<Row> 
{
 
   public ScanTaskReader(ScanTaskSource source) {
     this.source = source;
-    this.project = 
SchemaAndRowConversions.beamSchemaToIcebergSchema(source.getSchema());
+    this.project = IcebergUtils.beamSchemaToIcebergSchema(source.getSchema());
   }
 
   @Override
@@ -160,7 +160,7 @@ class ScanTaskReader extends 
BoundedSource.BoundedReader<Row> {
     if (current == null) {
       throw new NoSuchElementException();
     }
-    return SchemaAndRowConversions.recordToRow(source.getSchema(), current);
+    return IcebergUtils.icebergRecordToBeamRow(source.getSchema(), current);
   }
 
   @Override
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
index 1c5686bfde9..3a169eeb40d 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
@@ -134,8 +134,7 @@ public class IcebergIOIT implements Serializable {
           .addByteArrayField("bytes")
           .build();
 
-  static final Schema ICEBERG_SCHEMA =
-      SchemaAndRowConversions.beamSchemaToIcebergSchema(BEAM_SCHEMA);
+  static final Schema ICEBERG_SCHEMA = 
IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA);
 
   Map<String, Object> getValues(int num) {
     String strNum = Integer.toString(num);
@@ -238,7 +237,7 @@ public class IcebergIOIT implements Serializable {
 
     List<Row> inputRows =
         inputRecords.stream()
-            .map(record -> SchemaAndRowConversions.recordToRow(BEAM_SCHEMA, 
record))
+            .map(record -> IcebergUtils.icebergRecordToBeamRow(BEAM_SCHEMA, 
record))
             .collect(Collectors.toList());
 
     // Write with Beam
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
index d6db3f68911..3f31073b444 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOReadTest.java
@@ -70,7 +70,7 @@ public class IcebergIOReadTest {
     TableIdentifier tableId =
         TableIdentifier.of("default", "table" + 
Long.toString(UUID.randomUUID().hashCode(), 16));
     Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
-    final Schema schema = 
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
+    final Schema schema = 
IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
 
     simpleTable
         .newFastAppend()
@@ -91,7 +91,7 @@ public class IcebergIOReadTest {
                 TestFixtures.FILE2SNAPSHOT1,
                 TestFixtures.FILE3SNAPSHOT1)
             .flatMap(List::stream)
-            .map(record -> SchemaAndRowConversions.recordToRow(schema, record))
+            .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record))
             .collect(Collectors.toList());
 
     Properties props = new Properties();
@@ -105,9 +105,7 @@ public class IcebergIOReadTest {
         testPipeline
             .apply(IcebergIO.readRows(catalogConfig).from(tableId))
             .apply(ParDo.of(new PrintRow()))
-            .setCoder(
-                RowCoder.of(
-                    
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)));
+            
.setCoder(RowCoder.of(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)));
 
     PAssert.that(output)
         .satisfies(
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
index e0a584ec9da..02213c45e07 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
-import static 
org.apache.beam.sdk.io.iceberg.SchemaAndRowConversions.rowToRecord;
+import static 
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.io.Serializable;
@@ -85,7 +85,7 @@ public class IcebergIOWriteTest implements Serializable {
 
     testPipeline
         .apply("Records To Add", 
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
-        
.setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+        
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
         .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId));
 
     LOG.info("Executing pipeline");
@@ -152,7 +152,7 @@ public class IcebergIOWriteTest implements Serializable {
                         TestFixtures.FILE1SNAPSHOT1,
                         TestFixtures.FILE1SNAPSHOT2,
                         TestFixtures.FILE1SNAPSHOT3))))
-        
.setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+        
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
         .apply("Append To Table", 
IcebergIO.writeRows(catalog).to(dynamicDestinations));
 
     LOG.info("Executing pipeline");
@@ -235,7 +235,7 @@ public class IcebergIOWriteTest implements Serializable {
 
     testPipeline
         .apply("Records To Add", Create.of(TestFixtures.asRows(elements)))
-        
.setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+        
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
         .apply("Append To Table", 
IcebergIO.writeRows(catalog).to(dynamicDestinations));
 
     LOG.info("Executing pipeline");
@@ -262,9 +262,9 @@ public class IcebergIOWriteTest implements Serializable {
     // Create a table and add records to it.
     Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
     Record record =
-        rowToRecord(
+        beamRowToIcebergRecord(
             table.schema(),
-            
Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+            
Row.withSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
                 .addValues(42L, "bizzle")
                 .build());
 
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
index bc15021fa2b..effb5cc4838 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergReadSchemaTransformProviderTest.java
@@ -73,7 +73,7 @@ public class IcebergReadSchemaTransformProviderTest {
     TableIdentifier tableId = TableIdentifier.parse(identifier);
 
     Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
-    final Schema schema = 
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
+    final Schema schema = 
IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
 
     simpleTable
         .newFastAppend()
@@ -94,7 +94,7 @@ public class IcebergReadSchemaTransformProviderTest {
                 TestFixtures.FILE2SNAPSHOT1,
                 TestFixtures.FILE3SNAPSHOT1)
             .flatMap(List::stream)
-            .map(record -> SchemaAndRowConversions.recordToRow(schema, record))
+            .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record))
             .collect(Collectors.toList());
 
     Map<String, String> properties = new HashMap<>();
@@ -129,7 +129,7 @@ public class IcebergReadSchemaTransformProviderTest {
     TableIdentifier tableId = TableIdentifier.parse(identifier);
 
     Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);
-    final Schema schema = 
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
+    final Schema schema = 
IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA);
 
     simpleTable
         .newFastAppend()
@@ -150,7 +150,7 @@ public class IcebergReadSchemaTransformProviderTest {
                 TestFixtures.FILE2SNAPSHOT1,
                 TestFixtures.FILE3SNAPSHOT1)
             .flatMap(List::stream)
-            .map(record -> SchemaAndRowConversions.recordToRow(schema, record))
+            .map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record))
             .collect(Collectors.toList());
 
     String yamlConfig =
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
new file mode 100644
index 00000000000..c4da0b22f4d
--- /dev/null
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
@@ -0,0 +1,676 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.beam.sdk.io.iceberg.IcebergUtils.ObjectAndMaxId;
+import static 
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(Enclosed.class)
+public class IcebergUtilsTest {
+
+  @RunWith(JUnit4.class)
+  public static class RowToRecordTests {
+    /**
+     * Checks a value that when converted to Iceberg type is the same value 
when interpreted in
+     * Java.
+     */
+    private void checkRowValueToRecordValue(
+        Schema.FieldType sourceType, Type destType, Object value) {
+      checkRowValueToRecordValue(sourceType, value, destType, value);
+    }
+
+    private void checkRowValueToRecordValue(
+        Schema.FieldType sourceType, Object sourceValue, Type destType, Object 
destValue) {
+      Schema beamSchema = Schema.of(Schema.Field.of("v", sourceType));
+      Row row = Row.withSchema(beamSchema).addValues(sourceValue).build();
+
+      org.apache.iceberg.Schema icebergSchema =
+          new org.apache.iceberg.Schema(required(0, "v", destType));
+      Record record = IcebergUtils.beamRowToIcebergRecord(icebergSchema, row);
+
+      assertThat(record.getField("v"), equalTo(destValue));
+    }
+
+    @Test
+    public void testBoolean() {
+      checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, 
Types.BooleanType.get(), true);
+      checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, 
Types.BooleanType.get(), false);
+    }
+
+    @Test
+    public void testInteger() {
+      checkRowValueToRecordValue(Schema.FieldType.INT32, 
Types.IntegerType.get(), -13);
+      checkRowValueToRecordValue(Schema.FieldType.INT32, 
Types.IntegerType.get(), 42);
+      checkRowValueToRecordValue(Schema.FieldType.INT32, 
Types.IntegerType.get(), 0);
+    }
+
+    @Test
+    public void testLong() {
+      checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(), 
13L);
+      checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(), 
42L);
+    }
+
+    @Test
+    public void testFloat() {
+      checkRowValueToRecordValue(Schema.FieldType.FLOAT, 
Types.FloatType.get(), 3.14159f);
+      checkRowValueToRecordValue(Schema.FieldType.FLOAT, 
Types.FloatType.get(), 42.0f);
+    }
+
+    @Test
+    public void testDouble() {
+      checkRowValueToRecordValue(Schema.FieldType.DOUBLE, 
Types.DoubleType.get(), 3.14159);
+    }
+
+    @Test
+    public void testDate() {}
+
+    @Test
+    public void testTime() {}
+
+    @Test
+    public void testTimestamp() {
+      DateTime dateTime =
+          new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 
4).withZone(DateTimeZone.UTC);
+
+      checkRowValueToRecordValue(
+          Schema.FieldType.DATETIME,
+          dateTime.toInstant(),
+          Types.TimestampType.withoutZone(),
+          dateTime.getMillis());
+    }
+
+    @Test
+    public void testFixed() {}
+
+    @Test
+    public void testBinary() {
+      byte[] bytes = new byte[] {1, 2, 3, 4};
+      checkRowValueToRecordValue(
+          Schema.FieldType.BYTES, bytes, Types.BinaryType.get(), 
ByteBuffer.wrap(bytes));
+    }
+
+    @Test
+    public void testDecimal() {
+      BigDecimal num = BigDecimal.valueOf(123.456);
+
+      checkRowValueToRecordValue(Schema.FieldType.DECIMAL, 
Types.DecimalType.of(6, 3), num);
+    }
+
+    @Test
+    public void testStruct() {
+      Schema schema = Schema.builder().addStringField("nested_str").build();
+      Row beamRow = Row.withSchema(schema).addValue("str_value").build();
+
+      Types.NestedField nestedFieldType = required(1, "nested_str", 
Types.StringType.get());
+      GenericRecord icebergRow =
+          GenericRecord.create(new org.apache.iceberg.Schema(nestedFieldType));
+      icebergRow.setField("nested_str", "str_value");
+
+      checkRowValueToRecordValue(
+          Schema.FieldType.row(schema), beamRow, 
Types.StructType.of(nestedFieldType), icebergRow);
+    }
+
+    @Test
+    public void testMap() {
+      Map<String, Integer> map =
+          ImmutableMap.<String, Integer>builder().put("a", 123).put("b", 
456).put("c", 789).build();
+
+      checkRowValueToRecordValue(
+          Schema.FieldType.map(Schema.FieldType.STRING, 
Schema.FieldType.INT32),
+          Types.MapType.ofRequired(1, 2, Types.StringType.get(), 
Types.IntegerType.get()),
+          map);
+    }
+
+    @Test
+    public void testList() {
+      List<String> list = Arrays.asList("abc", "xyz", "123", "foo", "bar");
+
+      checkRowValueToRecordValue(
+          Schema.FieldType.array(Schema.FieldType.STRING),
+          Types.ListType.ofRequired(1, Types.StringType.get()),
+          list);
+    }
+  }
+
+  @RunWith(JUnit4.class)
+  public static class RecordToRowTests {
+    private void checkRecordValueToRowValue(
+        Type sourceType, Schema.FieldType destType, Object value) {
+      checkRecordValueToRowValue(sourceType, value, destType, value);
+    }
+
+    private void checkRecordValueToRowValue(
+        Type sourceType, Object sourceValue, Schema.FieldType destType, Object 
destValue) {
+      Schema beamSchema = Schema.of(Schema.Field.of("v", destType));
+
+      org.apache.iceberg.Schema icebergSchema =
+          new org.apache.iceberg.Schema(required(0, "v", sourceType));
+      Record record = GenericRecord.create(icebergSchema);
+      record.setField("v", sourceValue);
+
+      Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record);
+
+      assertThat(row.getBaseValue("v"), equalTo(destValue));
+    }
+
+    @Test
+    public void testBoolean() {
+      checkRecordValueToRowValue(Types.BooleanType.get(), 
Schema.FieldType.BOOLEAN, true);
+      checkRecordValueToRowValue(Types.BooleanType.get(), 
Schema.FieldType.BOOLEAN, false);
+    }
+
+    @Test
+    public void testInteger() {
+      checkRecordValueToRowValue(Types.IntegerType.get(), 
Schema.FieldType.INT32, -13);
+      checkRecordValueToRowValue(Types.IntegerType.get(), 
Schema.FieldType.INT32, 42);
+      checkRecordValueToRowValue(Types.IntegerType.get(), 
Schema.FieldType.INT32, 0);
+    }
+
+    @Test
+    public void testLong() {
+      checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64, 
13L);
+      checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64, 
42L);
+    }
+
+    @Test
+    public void testFloat() {
+      checkRecordValueToRowValue(Types.FloatType.get(), 
Schema.FieldType.FLOAT, 3.14159f);
+      checkRecordValueToRowValue(Types.FloatType.get(), 
Schema.FieldType.FLOAT, 42.0f);
+    }
+
+    @Test
+    public void testDouble() {
+      checkRecordValueToRowValue(Types.DoubleType.get(), 
Schema.FieldType.DOUBLE, 3.14159);
+    }
+
+    @Test
+    public void testDate() {}
+
+    @Test
+    public void testTime() {}
+
+    @Test
+    public void testTimestamp() {
+      DateTime dateTime =
+          new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 
4).withZone(DateTimeZone.UTC);
+
+      checkRecordValueToRowValue(
+          Types.TimestampType.withoutZone(),
+          dateTime.getMillis(),
+          Schema.FieldType.DATETIME,
+          dateTime.toInstant());
+    }
+
+    @Test
+    public void testFixed() {}
+
+    @Test
+    public void testBinary() {
+      byte[] bytes = new byte[] {1, 2, 3, 4};
+      checkRecordValueToRowValue(
+          Types.BinaryType.get(), ByteBuffer.wrap(bytes), 
Schema.FieldType.BYTES, bytes);
+    }
+
+    @Test
+    public void testDecimal() {
+      BigDecimal num = BigDecimal.valueOf(123.456);
+
+      checkRecordValueToRowValue(Types.DecimalType.of(6, 3), 
Schema.FieldType.DECIMAL, num);
+    }
+
+    @Test
+    public void testStruct() {
+      Schema schema = Schema.builder().addStringField("nested_str").build();
+      Row beamRow = Row.withSchema(schema).addValue("str_value").build();
+
+      Types.NestedField nestedFieldType = required(1, "nested_str", 
Types.StringType.get());
+      GenericRecord icebergRow =
+          GenericRecord.create(new org.apache.iceberg.Schema(nestedFieldType));
+      icebergRow.setField("nested_str", "str_value");
+
+      checkRecordValueToRowValue(
+          Types.StructType.of(nestedFieldType), icebergRow, 
Schema.FieldType.row(schema), beamRow);
+    }
+
+    @Test
+    public void testMap() {
+      Map<String, Integer> map =
+          ImmutableMap.<String, Integer>builder().put("a", 123).put("b", 
456).put("c", 789).build();
+
+      checkRecordValueToRowValue(
+          Types.MapType.ofRequired(1, 2, Types.StringType.get(), 
Types.IntegerType.get()),
+          Schema.FieldType.map(Schema.FieldType.STRING, 
Schema.FieldType.INT32),
+          map);
+    }
+
+    @Test
+    public void testList() {
+      List<String> list = Arrays.asList("abc", "xyz", "123", "foo", "bar");
+
+      checkRecordValueToRowValue(
+          Types.ListType.ofRequired(1, Types.StringType.get()),
+          Schema.FieldType.iterable(Schema.FieldType.STRING),
+          list);
+    }
+  }
+
+  @RunWith(JUnit4.class)
+  public static class SchemaTests {
+    private static class BeamFieldTypeTestCase {
+      final int icebergFieldId;
+      final Schema.FieldType beamType;
+      final int expectedMaxId;
+      final Type expectedIcebergType;
+
+      BeamFieldTypeTestCase(
+          int icebergFieldId,
+          Schema.FieldType beamType,
+          int expectedMaxId,
+          Type expectedIcebergType) {
+        this.icebergFieldId = icebergFieldId;
+        this.beamType = beamType;
+        this.expectedMaxId = expectedMaxId;
+        this.expectedIcebergType = expectedIcebergType;
+      }
+    }
+
+    private void checkTypes(List<BeamFieldTypeTestCase> testCases) {
+      for (BeamFieldTypeTestCase testCase : testCases) {
+        ObjectAndMaxId<Type> ret =
+            beamFieldTypeToIcebergFieldType(testCase.icebergFieldId, 
testCase.beamType);
+
+        assertEquals(testCase.expectedMaxId, ret.maxId);
+        checkEquals(testCase.expectedIcebergType, ret.object);
+      }
+    }
+
+    private void checkEquals(Type expected, Type actual) {
+      if (expected.isListType() && actual.isListType()) {
+        Type nestedExpected = expected.asListType().elementType();
+        Type nestedActual = actual.asListType().elementType();
+
+        assertEquals(nestedExpected.typeId(), nestedActual.typeId());
+        checkEquals(nestedExpected, nestedActual);
+      } else {
+        assertEquals(expected, actual);
+      }
+    }
+
+    @Test
+    public void testPrimitiveBeamFieldTypeToIcebergFieldType() {
+      List<BeamFieldTypeTestCase> primitives =
+          Arrays.asList(
+              new BeamFieldTypeTestCase(1, Schema.FieldType.BOOLEAN, 1, 
Types.BooleanType.get()),
+              new BeamFieldTypeTestCase(3, Schema.FieldType.INT32, 3, 
Types.IntegerType.get()),
+              new BeamFieldTypeTestCase(6, Schema.FieldType.INT64, 6, 
Types.LongType.get()),
+              new BeamFieldTypeTestCase(10, Schema.FieldType.FLOAT, 10, 
Types.FloatType.get()),
+              new BeamFieldTypeTestCase(7, Schema.FieldType.DOUBLE, 7, 
Types.DoubleType.get()),
+              new BeamFieldTypeTestCase(11, Schema.FieldType.STRING, 11, 
Types.StringType.get()),
+              new BeamFieldTypeTestCase(15, Schema.FieldType.BYTES, 15, 
Types.BinaryType.get()));
+
+      checkTypes(primitives);
+    }
+
+    @Test
+    public void testArrayBeamFieldTypeToIcebergFieldType() {
+      // Iceberg sets one field ID for the List type itself and another field 
ID for the collection
+      // type.
+      List<BeamFieldTypeTestCase> listTypes =
+          Arrays.asList(
+              new BeamFieldTypeTestCase(
+                  1,
+                  Schema.FieldType.array(Schema.FieldType.BOOLEAN),
+                  2,
+                  Types.ListType.ofRequired(1, Types.BooleanType.get())),
+              new BeamFieldTypeTestCase(
+                  3,
+                  Schema.FieldType.iterable(Schema.FieldType.INT32),
+                  4,
+                  Types.ListType.ofRequired(3, Types.IntegerType.get())),
+              new BeamFieldTypeTestCase(
+                  6,
+                  Schema.FieldType.array(Schema.FieldType.INT64),
+                  7,
+                  Types.ListType.ofRequired(6, Types.LongType.get())),
+              new BeamFieldTypeTestCase(
+                  10,
+                  Schema.FieldType.array(Schema.FieldType.FLOAT),
+                  11,
+                  Types.ListType.ofRequired(10, Types.FloatType.get())),
+              new BeamFieldTypeTestCase(
+                  7,
+                  Schema.FieldType.iterable(Schema.FieldType.DOUBLE),
+                  8,
+                  Types.ListType.ofRequired(7, Types.DoubleType.get())),
+              new BeamFieldTypeTestCase(
+                  11,
+                  Schema.FieldType.array(Schema.FieldType.STRING),
+                  12,
+                  Types.ListType.ofRequired(11, Types.StringType.get())),
+              new BeamFieldTypeTestCase(
+                  15,
+                  Schema.FieldType.iterable(Schema.FieldType.BYTES),
+                  16,
+                  Types.ListType.ofRequired(15, Types.BinaryType.get())),
+              new BeamFieldTypeTestCase(
+                  23,
+                  Schema.FieldType.array(
+                      
Schema.FieldType.array(Schema.FieldType.iterable(Schema.FieldType.STRING))),
+                  26,
+                  Types.ListType.ofRequired(
+                      23,
+                      Types.ListType.ofRequired(
+                          24, Types.ListType.ofRequired(25, 
Types.StringType.get())))));
+
+      checkTypes(listTypes);
+    }
+
+    @Test
+    public void testStructBeamFieldTypeToIcebergFieldType() {
+      // Iceberg sets one field ID for each nested type.
+      List<BeamFieldTypeTestCase> listTypes =
+          Arrays.asList(
+              new BeamFieldTypeTestCase(
+                  1,
+                  
Schema.FieldType.row(Schema.builder().addStringField("str").build()),
+                  2,
+                  Types.StructType.of(
+                      Types.NestedField.required(2, "str", 
Types.StringType.get()))),
+              new BeamFieldTypeTestCase(
+                  3,
+                  
Schema.FieldType.row(Schema.builder().addInt32Field("int").build()),
+                  4,
+                  Types.StructType.of(
+                      Types.NestedField.required(4, "int", 
Types.IntegerType.get()))),
+              new BeamFieldTypeTestCase(
+                  0,
+                  Schema.FieldType.row(BEAM_SCHEMA_PRIMITIVE),
+                  7,
+                  Types.StructType.of(ICEBERG_SCHEMA_PRIMITIVE.columns())),
+              new BeamFieldTypeTestCase(
+                  15,
+                  Schema.FieldType.row(
+                      Schema.builder()
+                          .addArrayField("arr", Schema.FieldType.STRING)
+                          .addNullableStringField("str")
+                          .build()),
+                  18,
+                  Types.StructType.of(
+                      Types.NestedField.required(
+                          16, "arr", Types.ListType.ofRequired(17, 
Types.StringType.get())),
+                      Types.NestedField.optional(18, "str", 
Types.StringType.get()))),
+              new BeamFieldTypeTestCase(
+                  20,
+                  Schema.FieldType.row(
+                      Schema.builder()
+                          .addRowField(
+                              "row",
+                              Schema.builder()
+                                  .addRowField(
+                                      "nested_row", 
Schema.builder().addStringField("str").build())
+                                  .build())
+                          .addNullableRowField(
+                              "nullable_row", 
Schema.builder().addInt64Field("long").build())
+                          .build()),
+                  25,
+                  Types.StructType.of(
+                      Types.NestedField.required(
+                          21,
+                          "row",
+                          Types.StructType.of(
+                              Types.NestedField.required(
+                                  22,
+                                  "nested_row",
+                                  Types.StructType.of(
+                                      Types.NestedField.required(
+                                          23, "str", 
Types.StringType.get()))))),
+                      Types.NestedField.optional(
+                          24,
+                          "nullable_row",
+                          Types.StructType.of(
+                              Types.NestedField.required(25, "long", 
Types.LongType.get()))))));
+
+      checkTypes(listTypes);
+    }
+
+    @Test
+    public void testMapBeamFieldTypeToIcebergFieldType() {
+      List<BeamFieldTypeTestCase> primitives =
+          Arrays.asList(
+              new BeamFieldTypeTestCase(
+                  1,
+                  Schema.FieldType.map(Schema.FieldType.STRING, 
Schema.FieldType.INT32),
+                  3,
+                  Types.MapType.ofRequired(2, 3, Types.StringType.get(), 
Types.IntegerType.get())),
+              new BeamFieldTypeTestCase(
+                  6,
+                  Schema.FieldType.map(
+                      Schema.FieldType.FLOAT, 
Schema.FieldType.array(Schema.FieldType.STRING)),
+                  9,
+                  Types.MapType.ofRequired(
+                      7,
+                      8,
+                      Types.FloatType.get(),
+                      Types.ListType.ofRequired(9, Types.StringType.get()))),
+              new BeamFieldTypeTestCase(
+                  10,
+                  Schema.FieldType.map(
+                      Schema.FieldType.STRING,
+                      Schema.FieldType.map(
+                          Schema.FieldType.BOOLEAN,
+                          Schema.FieldType.map(Schema.FieldType.STRING, 
Schema.FieldType.INT32))),
+                  16,
+                  Types.MapType.ofRequired(
+                      11,
+                      12,
+                      Types.StringType.get(),
+                      Types.MapType.ofRequired(
+                          13,
+                          14,
+                          Types.BooleanType.get(),
+                          Types.MapType.ofRequired(
+                              15, 16, Types.StringType.get(), 
Types.IntegerType.get())))),
+              new BeamFieldTypeTestCase(
+                  15,
+                  Schema.FieldType.map(
+                      
Schema.FieldType.row(Schema.builder().addStringField("str").build()),
+                      
Schema.FieldType.row(Schema.builder().addInt32Field("int").build())),
+                  19,
+                  Types.MapType.ofRequired(
+                      16,
+                      17,
+                      Types.StructType.of(
+                          Types.NestedField.required(18, "str", 
Types.StringType.get())),
+                      Types.StructType.of(
+                          Types.NestedField.required(19, "int", 
Types.IntegerType.get())))));
+
+      checkTypes(primitives);
+    }
+
+    static final Schema BEAM_SCHEMA_PRIMITIVE =
+        Schema.builder()
+            .addInt32Field("int")
+            .addFloatField("float")
+            .addNullableDoubleField("double")
+            .addInt64Field("long")
+            .addNullableStringField("str")
+            .addNullableBooleanField("bool")
+            .addByteArrayField("bytes")
+            .build();
+
+    static final org.apache.iceberg.Schema ICEBERG_SCHEMA_PRIMITIVE =
+        new org.apache.iceberg.Schema(
+            required(1, "int", Types.IntegerType.get()),
+            required(2, "float", Types.FloatType.get()),
+            optional(3, "double", Types.DoubleType.get()),
+            required(4, "long", Types.LongType.get()),
+            optional(5, "str", Types.StringType.get()),
+            optional(6, "bool", Types.BooleanType.get()),
+            required(7, "bytes", Types.BinaryType.get()));
+
+    @Test
+    public void testPrimitiveBeamSchemaToIcebergSchema() {
+      org.apache.iceberg.Schema convertedIcebergSchema =
+          IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_PRIMITIVE);
+
+      System.out.println(convertedIcebergSchema);
+      System.out.println(ICEBERG_SCHEMA_PRIMITIVE);
+
+      assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_PRIMITIVE));
+    }
+
+    @Test
+    public void testPrimitiveIcebergSchemaToBeamSchema() {
+      Schema convertedBeamSchema = 
IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_PRIMITIVE);
+
+      assertEquals(BEAM_SCHEMA_PRIMITIVE, convertedBeamSchema);
+    }
+
+    static final Schema BEAM_SCHEMA_LIST =
+        Schema.builder()
+            .addIterableField("arr_str", Schema.FieldType.STRING)
+            .addIterableField("arr_int", Schema.FieldType.INT32)
+            .addIterableField("arr_bool", Schema.FieldType.BOOLEAN)
+            .build();
+    static final org.apache.iceberg.Schema ICEBERG_SCHEMA_LIST =
+        new org.apache.iceberg.Schema(
+            required(1, "arr_str", Types.ListType.ofRequired(2, 
Types.StringType.get())),
+            required(3, "arr_int", Types.ListType.ofRequired(4, 
Types.IntegerType.get())),
+            required(5, "arr_bool", Types.ListType.ofRequired(6, 
Types.BooleanType.get())));
+
+    @Test
+    public void testArrayBeamSchemaToIcebergSchema() {
+      org.apache.iceberg.Schema convertedIcebergSchema =
+          IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_LIST);
+
+      assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_LIST));
+    }
+
+    @Test
+    public void testArrayIcebergSchemaToBeamSchema() {
+      Schema convertedBeamSchema = 
IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_LIST);
+
+      System.out.println(convertedBeamSchema);
+      System.out.println(BEAM_SCHEMA_LIST);
+
+      assertEquals(BEAM_SCHEMA_LIST, convertedBeamSchema);
+    }
+
+    static final Schema BEAM_SCHEMA_MAP =
+        Schema.builder()
+            .addMapField("str_int", Schema.FieldType.STRING, 
Schema.FieldType.INT32)
+            .addNullableMapField("long_bool", Schema.FieldType.INT64, 
Schema.FieldType.BOOLEAN)
+            .build();
+
+    static final org.apache.iceberg.Schema ICEBERG_SCHEMA_MAP =
+        new org.apache.iceberg.Schema(
+            required(
+                1,
+                "str_int",
+                Types.MapType.ofRequired(2, 3, Types.StringType.get(), 
Types.IntegerType.get())),
+            optional(
+                4,
+                "long_bool",
+                Types.MapType.ofRequired(5, 6, Types.LongType.get(), 
Types.BooleanType.get())));
+
+    @Test
+    public void testMapBeamSchemaToIcebergSchema() {
+      org.apache.iceberg.Schema convertedIcebergSchema =
+          IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_MAP);
+
+      assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_MAP));
+    }
+
+    @Test
+    public void testMapIcebergSchemaToBeamSchema() {
+      Schema convertedBeamSchema = 
IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_MAP);
+
+      assertEquals(BEAM_SCHEMA_MAP, convertedBeamSchema);
+    }
+
+    static final Schema BEAM_SCHEMA_STRUCT =
+        Schema.builder()
+            .addRowField(
+                "row",
+                Schema.builder()
+                    .addStringField("str")
+                    .addNullableInt32Field("int")
+                    .addInt64Field("long")
+                    .build())
+            .addNullableRowField(
+                "nullable_row",
+                
Schema.builder().addNullableStringField("str").addBooleanField("bool").build())
+            .build();
+
+    static final org.apache.iceberg.Schema ICEBERG_SCHEMA_STRUCT =
+        new org.apache.iceberg.Schema(
+            required(
+                1,
+                "row",
+                Types.StructType.of(
+                    required(2, "str", Types.StringType.get()),
+                    optional(3, "int", Types.IntegerType.get()),
+                    required(4, "long", Types.LongType.get()))),
+            optional(
+                5,
+                "nullable_row",
+                Types.StructType.of(
+                    optional(6, "str", Types.StringType.get()),
+                    required(7, "bool", Types.BooleanType.get()))));
+
+    @Test
+    public void testStructBeamSchemaToIcebergSchema() {
+      org.apache.iceberg.Schema convertedIcebergSchema =
+          IcebergUtils.beamSchemaToIcebergSchema(BEAM_SCHEMA_STRUCT);
+
+      assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA_STRUCT));
+    }
+
+    @Test
+    public void testStructIcebergSchemaToBeamSchema() {
+      Schema convertedBeamSchema = 
IcebergUtils.icebergSchemaToBeamSchema(ICEBERG_SCHEMA_STRUCT);
+
+      assertEquals(BEAM_SCHEMA_STRUCT, convertedBeamSchema);
+    }
+  }
+}
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
index 75884f4bcf7..a2cd64e2395 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
@@ -102,8 +102,7 @@ public class IcebergWriteSchemaTransformProviderTest {
             testPipeline
                 .apply(
                     "Records To Add", 
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
-                .setRowSchema(
-                    
SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)));
+                
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)));
 
     PCollection<Row> result =
         input
@@ -137,7 +136,7 @@ public class IcebergWriteSchemaTransformProviderTest {
     PCollection<Row> inputRows =
         testPipeline
             .apply("Records To Add", 
Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
-            
.setRowSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA));
+            
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA));
     PCollection<Row> result =
         
inputRows.apply(Managed.write(Managed.ICEBERG).withConfig(configMap)).get(OUTPUT_TAG);
 
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
index 143687e3c99..007cb028c66 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ScanSourceTest.java
@@ -79,7 +79,7 @@ public class ScanSourceTest {
                         .build())
                 .setScanType(IcebergScanConfig.ScanType.TABLE)
                 .setTableIdentifier(simpleTable.name().replace("hadoop.", 
"").split("\\."))
-                
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+                
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
                 .build());
 
     BoundedSource.BoundedReader<Row> reader = source.createReader(options);
@@ -121,7 +121,7 @@ public class ScanSourceTest {
                         .build())
                 .setScanType(IcebergScanConfig.ScanType.TABLE)
                 .setTableIdentifier(simpleTable.name().replace("hadoop.", 
"").split("\\."))
-                
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+                
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
                 .build());
 
     // Input data for this test is tiny so try a number of very small split 
sizes
@@ -167,7 +167,7 @@ public class ScanSourceTest {
                         .build())
                 .setScanType(IcebergScanConfig.ScanType.TABLE)
                 .setTableIdentifier(simpleTable.name().replace("hadoop.", 
"").split("\\."))
-                
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
+                
.setSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
                 .build());
 
     // Input data for this test is tiny so make sure to split and get a few, 
but so they can be
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversionsTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversionsTest.java
deleted file mode 100644
index 5c708700a17..00000000000
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversionsTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.iceberg;
-
-import static org.apache.iceberg.types.Types.NestedField.required;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.ByteBuffer;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.values.Row;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.Types;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(Enclosed.class)
-public class SchemaAndRowConversionsTest {
-
-  @RunWith(JUnit4.class)
-  public static class RowToRecordTests {
-    /**
-     * Checks a value that when converted to Iceberg type is the same value 
when interpreted in
-     * Java.
-     */
-    private void checkRowValueToRecordValue(
-        Schema.FieldType sourceType, Type destType, Object value) {
-      checkRowValueToRecordValue(sourceType, value, destType, value);
-    }
-
-    private void checkRowValueToRecordValue(
-        Schema.FieldType sourceType, Object sourceValue, Type destType, Object 
destValue) {
-      Schema beamSchema = Schema.of(Schema.Field.of("v", sourceType));
-      Row row = Row.withSchema(beamSchema).addValues(sourceValue).build();
-
-      org.apache.iceberg.Schema icebergSchema =
-          new org.apache.iceberg.Schema(required(0, "v", destType));
-      Record record = SchemaAndRowConversions.rowToRecord(icebergSchema, row);
-
-      assertThat(record.getField("v"), equalTo(destValue));
-    }
-
-    @Test
-    public void testBoolean() throws Exception {
-      checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, 
Types.BooleanType.get(), true);
-      checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, 
Types.BooleanType.get(), false);
-    }
-
-    @Test
-    public void testInteger() throws Exception {
-      checkRowValueToRecordValue(Schema.FieldType.INT32, 
Types.IntegerType.get(), -13);
-      checkRowValueToRecordValue(Schema.FieldType.INT32, 
Types.IntegerType.get(), 42);
-      checkRowValueToRecordValue(Schema.FieldType.INT32, 
Types.IntegerType.get(), 0);
-    }
-
-    @Test
-    public void testLong() throws Exception {
-      checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(), 
13L);
-      checkRowValueToRecordValue(Schema.FieldType.INT64, Types.LongType.get(), 
42L);
-    }
-
-    @Test
-    public void testFloat() throws Exception {
-      checkRowValueToRecordValue(Schema.FieldType.FLOAT, 
Types.FloatType.get(), 3.14159f);
-      checkRowValueToRecordValue(Schema.FieldType.FLOAT, 
Types.FloatType.get(), 42.0f);
-    }
-
-    @Test
-    public void testDouble() throws Exception {
-      checkRowValueToRecordValue(Schema.FieldType.DOUBLE, 
Types.DoubleType.get(), 3.14159);
-    }
-
-    @Test
-    public void testDate() throws Exception {}
-
-    @Test
-    public void testTime() throws Exception {}
-
-    @Test
-    public void testTimestamp() throws Exception {
-      DateTime dateTime =
-          new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 
4).withZone(DateTimeZone.UTC);
-
-      checkRowValueToRecordValue(
-          Schema.FieldType.DATETIME,
-          dateTime.toInstant(),
-          Types.TimestampType.withoutZone(),
-          dateTime.getMillis());
-    }
-
-    @Test
-    public void testFixed() throws Exception {}
-
-    @Test
-    public void testBinary() throws Exception {
-      byte[] bytes = new byte[] {1, 2, 3, 4};
-      checkRowValueToRecordValue(
-          Schema.FieldType.BYTES, bytes, Types.BinaryType.get(), 
ByteBuffer.wrap(bytes));
-    }
-
-    @Test
-    public void testDecimal() throws Exception {}
-
-    @Test
-    public void testStruct() throws Exception {}
-
-    @Test
-    public void testMap() throws Exception {}
-
-    @Test
-    public void testList() throws Exception {}
-  }
-
-  @RunWith(JUnit4.class)
-  public static class RecordToRowTests {
-    private void checkRecordValueToRowValue(
-        Type sourceType, Schema.FieldType destType, Object value) {
-      checkRecordValueToRowValue(sourceType, value, destType, value);
-    }
-
-    private void checkRecordValueToRowValue(
-        Type sourceType, Object sourceValue, Schema.FieldType destType, Object 
destValue) {
-      Schema beamSchema = Schema.of(Schema.Field.of("v", destType));
-
-      org.apache.iceberg.Schema icebergSchema =
-          new org.apache.iceberg.Schema(required(0, "v", sourceType));
-      Record record = GenericRecord.create(icebergSchema);
-      record.setField("v", sourceValue);
-
-      Row row = SchemaAndRowConversions.recordToRow(beamSchema, record);
-
-      assertThat(row.getBaseValue("v"), equalTo(destValue));
-    }
-
-    @Test
-    public void testBoolean() throws Exception {
-      checkRecordValueToRowValue(Types.BooleanType.get(), 
Schema.FieldType.BOOLEAN, true);
-      checkRecordValueToRowValue(Types.BooleanType.get(), 
Schema.FieldType.BOOLEAN, false);
-    }
-
-    @Test
-    public void testInteger() throws Exception {
-      checkRecordValueToRowValue(Types.IntegerType.get(), 
Schema.FieldType.INT32, -13);
-      checkRecordValueToRowValue(Types.IntegerType.get(), 
Schema.FieldType.INT32, 42);
-      checkRecordValueToRowValue(Types.IntegerType.get(), 
Schema.FieldType.INT32, 0);
-    }
-
-    @Test
-    public void testLong() throws Exception {
-      checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64, 
13L);
-      checkRecordValueToRowValue(Types.LongType.get(), Schema.FieldType.INT64, 
42L);
-    }
-
-    @Test
-    public void testFloat() throws Exception {
-      checkRecordValueToRowValue(Types.FloatType.get(), 
Schema.FieldType.FLOAT, 3.14159f);
-      checkRecordValueToRowValue(Types.FloatType.get(), 
Schema.FieldType.FLOAT, 42.0f);
-    }
-
-    @Test
-    public void testDouble() throws Exception {
-      checkRecordValueToRowValue(Types.DoubleType.get(), 
Schema.FieldType.DOUBLE, 3.14159);
-    }
-
-    @Test
-    public void testDate() throws Exception {}
-
-    @Test
-    public void testTime() throws Exception {}
-
-    @Test
-    public void testTimestamp() throws Exception {
-      DateTime dateTime =
-          new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 
4).withZone(DateTimeZone.UTC);
-
-      checkRecordValueToRowValue(
-          Types.TimestampType.withoutZone(),
-          dateTime.getMillis(),
-          Schema.FieldType.DATETIME,
-          dateTime.toInstant());
-    }
-
-    @Test
-    public void testFixed() throws Exception {}
-
-    @Test
-    public void testBinary() throws Exception {
-      byte[] bytes = new byte[] {1, 2, 3, 4};
-      checkRecordValueToRowValue(
-          Types.BinaryType.get(), ByteBuffer.wrap(bytes), 
Schema.FieldType.BYTES, bytes);
-    }
-
-    @Test
-    public void testDecimal() throws Exception {}
-
-    @Test
-    public void testStruct() throws Exception {}
-
-    @Test
-    public void testMap() throws Exception {}
-
-    @Test
-    public void testList() throws Exception {}
-  }
-
-  @RunWith(JUnit4.class)
-  public static class SchemaTests {
-    static final Schema BEAM_SCHEMA =
-        Schema.builder()
-            .addInt32Field("int")
-            .addFloatField("float")
-            .addDoubleField("double")
-            .addInt64Field("long")
-            .addStringField("str")
-            .addBooleanField("bool")
-            .addByteArrayField("bytes")
-            .build();
-
-    static final org.apache.iceberg.Schema ICEBERG_SCHEMA =
-        new org.apache.iceberg.Schema(
-            Types.NestedField.required(1, "int", Types.IntegerType.get()),
-            Types.NestedField.required(2, "float", Types.FloatType.get()),
-            Types.NestedField.required(3, "double", Types.DoubleType.get()),
-            Types.NestedField.required(4, "long", Types.LongType.get()),
-            Types.NestedField.required(5, "str", Types.StringType.get()),
-            Types.NestedField.required(6, "bool", Types.BooleanType.get()),
-            Types.NestedField.required(7, "bytes", Types.BinaryType.get()));
-
-    @Test
-    public void testBeamSchemaToIcebergSchema() {
-      org.apache.iceberg.Schema convertedIcebergSchema =
-          SchemaAndRowConversions.beamSchemaToIcebergSchema(BEAM_SCHEMA);
-
-      assertTrue(convertedIcebergSchema.sameSchema(ICEBERG_SCHEMA));
-    }
-
-    @Test
-    public void testIcebergSchemaToBeamSchema() {
-      Schema convertedBeamSchema =
-          SchemaAndRowConversions.icebergSchemaToBeamSchema(ICEBERG_SCHEMA);
-
-      assertEquals(BEAM_SCHEMA, convertedBeamSchema);
-    }
-  }
-}
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java
index 4048e88398a..6143bd03491 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestFixtures.java
@@ -91,7 +91,7 @@ public class TestFixtures {
     ArrayList<Row> rows = new ArrayList<>();
     for (Record record : records) {
       rows.add(
-          
Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(SCHEMA))
+          Row.withSchema(IcebergUtils.icebergSchemaToBeamSchema(SCHEMA))
               .withFieldValue("id", record.getField("id"))
               .withFieldValue("data", record.getField("data"))
               .build());

Reply via email to