This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b54967eab41 Fix Beam Schema to Iceberg Schema ID conversion logic 
(#32095)
b54967eab41 is described below

commit b54967eab41f51e9329833d5e2ac18ee522c151c
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Aug 7 13:02:29 2024 -0400

    Fix Beam Schema to Iceberg Schema ID conversion logic (#32095)
    
    * fix iceberg schema ID logic
    
    * trigger integration tests
---
 .../IO_Iceberg_Integration_Tests.json              |   2 +-
 .../apache/beam/sdk/io/iceberg/IcebergUtils.java   | 181 ++++++++++-----------
 .../beam/sdk/io/iceberg/IcebergUtilsTest.java      | 113 ++++++-------
 3 files changed, 148 insertions(+), 148 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 3f63c0c9975..bbdc3a3910e 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-    "modification": 2
+    "modification": 3
 }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
index a2f84e6475c..acd9b25a6a5 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
@@ -115,113 +116,110 @@ public class IcebergUtils {
   }
 
   /**
-   * Represents an Object (in practice, either {@link Type} or {@link 
Types.NestedField}) along with
-   * the most recent (max) ID that has been used to build this object.
+   * Represents a {@link Type} and the most recent field ID used to build it.
    *
    * <p>Iceberg Schema fields are required to have unique IDs. This includes 
unique IDs for a {@link
-   * Types.ListType}'s collection type, a {@link Types.MapType}'s key type and 
value type, and
-   * nested {@link Types.StructType}s. When constructing any of these types, 
we use multiple unique
-   * ID's for the type's components. The {@code maxId} in this object 
represents the most recent ID
-   * used after building this type. This helps signal that the next field we 
construct should have
-   * an ID greater than this one.
+   * org.apache.iceberg.types.Type.NestedType}'s components (e.g. {@link 
Types.ListType}'s
+   * collection type, {@link Types.MapType}'s key type and value type, and 
{@link
+   * Types.StructType}'s nested fields). The {@code maxId} in this object 
represents the most recent
+   * ID used after building this type. This helps signal that the next {@link
+   * org.apache.iceberg.types.Type.NestedType} we construct should have an ID 
greater than this one.
    */
   @VisibleForTesting
-  static class ObjectAndMaxId<T> {
+  static class TypeAndMaxId {
     int maxId;
-    T object;
+    Type type;
 
-    ObjectAndMaxId(int id, T object) {
+    TypeAndMaxId(int id, Type object) {
       this.maxId = id;
-      this.object = object;
+      this.type = object;
     }
   }
 
   /**
-   * Given a Beam {@link Schema.FieldType} and an index, returns an Iceberg 
{@link Type} and the
-   * maximum index after building the Iceberg Type. This assumes the input 
index is already in use
-   * (usually by the parent {@link Types.NestedField}, and will start building 
the Iceberg type from
-   * index + 1.
+   * Takes a Beam {@link Schema.FieldType} and an index intended as a starting 
point for Iceberg
+   * {@link org.apache.iceberg.types.Type.NestedType}s. Returns an Iceberg 
{@link Type} and the
+   * maximum index after building that type.
    *
-   * <p>Returns this information in an {@link ObjectAndMaxId<Type>} instance.
+   * <p>Returns this information in an {@link TypeAndMaxId} object.
    */
   @VisibleForTesting
-  static ObjectAndMaxId<Type> beamFieldTypeToIcebergFieldType(
-      int fieldId, Schema.FieldType beamType) {
+  static TypeAndMaxId beamFieldTypeToIcebergFieldType(
+      Schema.FieldType beamType, int nestedFieldId) {
     if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
-      return new ObjectAndMaxId<>(fieldId, 
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
+      // we don't use nested field ID for primitive types. decrement it so the 
caller can use it for
+      // other types.
+      return new TypeAndMaxId(
+          --nestedFieldId, 
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
     } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or 
ITERABLE
-      // List ID needs to be unique from the NestedField that contains this 
ListType
-      int listId = fieldId + 1;
       Schema.FieldType beamCollectionType =
           
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
 
-      ObjectAndMaxId<Type> listInfo = beamFieldTypeToIcebergFieldType(listId, 
beamCollectionType);
-      Type icebergCollectionType = listInfo.object;
+      // nestedFieldId is reserved for the list's collection type.
+      // we increment here because further nested fields should use unique ID's
+      TypeAndMaxId listInfo =
+          beamFieldTypeToIcebergFieldType(beamCollectionType, nestedFieldId + 
1);
+      Type icebergCollectionType = listInfo.type;
 
       boolean elementTypeIsNullable =
           
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable();
 
       Type listType =
           elementTypeIsNullable
-              ? Types.ListType.ofOptional(listId, icebergCollectionType)
-              : Types.ListType.ofRequired(listId, icebergCollectionType);
+              ? Types.ListType.ofOptional(nestedFieldId, icebergCollectionType)
+              : Types.ListType.ofRequired(nestedFieldId, 
icebergCollectionType);
 
-      return new ObjectAndMaxId<>(listInfo.maxId, listType);
+      return new TypeAndMaxId(listInfo.maxId, listType);
     } else if (beamType.getTypeName().isMapType()) { // MAP
-      // key and value IDs need to be unique from the NestedField that 
contains this MapType
-      int keyId = fieldId + 1;
-      int valueId = fieldId + 2;
-      int maxId = valueId;
+      // key and value IDs need to be unique
+      int keyId = nestedFieldId;
+      int valueId = keyId + 1;
 
+      // nested field IDs should be unique
+      nestedFieldId = valueId + 1;
       Schema.FieldType beamKeyType = 
Preconditions.checkArgumentNotNull(beamType.getMapKeyType());
-      ObjectAndMaxId<Type> keyInfo = beamFieldTypeToIcebergFieldType(maxId, 
beamKeyType);
-      Type icebergKeyType = keyInfo.object;
-      maxId = keyInfo.maxId;
+      TypeAndMaxId keyInfo = beamFieldTypeToIcebergFieldType(beamKeyType, 
nestedFieldId);
+      Type icebergKeyType = keyInfo.type;
 
+      nestedFieldId = keyInfo.maxId + 1;
       Schema.FieldType beamValueType =
           Preconditions.checkArgumentNotNull(beamType.getMapValueType());
-      ObjectAndMaxId<Type> valueInfo = beamFieldTypeToIcebergFieldType(maxId, 
beamValueType);
-      Type icebergValueType = valueInfo.object;
-      maxId = valueInfo.maxId;
+      TypeAndMaxId valueInfo = beamFieldTypeToIcebergFieldType(beamValueType, 
nestedFieldId);
+      Type icebergValueType = valueInfo.type;
 
       Type mapType =
           beamValueType.getNullable()
               ? Types.MapType.ofOptional(keyId, valueId, icebergKeyType, 
icebergValueType)
               : Types.MapType.ofRequired(keyId, valueId, icebergKeyType, 
icebergValueType);
 
-      return new ObjectAndMaxId<>(maxId, mapType);
+      return new TypeAndMaxId(valueInfo.maxId, mapType);
     } else if (beamType.getTypeName().isCompositeType()) { // ROW
       // Nested field IDs need to be unique from the field that contains this 
StructType
-      int maxFieldId = fieldId;
-
       Schema nestedSchema = 
Preconditions.checkArgumentNotNull(beamType.getRowSchema());
       List<Types.NestedField> nestedFields = new 
ArrayList<>(nestedSchema.getFieldCount());
-      for (Schema.Field field : nestedSchema.getFields()) {
-        ObjectAndMaxId<Types.NestedField> converted = 
beamFieldToIcebergField(++maxFieldId, field);
-        Types.NestedField nestedField = converted.object;
-        nestedFields.add(nestedField);
 
-        maxFieldId = converted.maxId;
+      int icebergFieldId = nestedFieldId;
+      nestedFieldId = icebergFieldId + nestedSchema.getFieldCount();
+      for (Schema.Field beamField : nestedSchema.getFields()) {
+        TypeAndMaxId typeAndMaxId =
+            beamFieldTypeToIcebergFieldType(beamField.getType(), 
nestedFieldId);
+        Types.NestedField icebergField =
+            Types.NestedField.of(
+                icebergFieldId++,
+                beamField.getType().getNullable(),
+                beamField.getName(),
+                typeAndMaxId.type);
+
+        nestedFields.add(icebergField);
+        nestedFieldId = typeAndMaxId.maxId + 1;
       }
 
       Type structType = Types.StructType.of(nestedFields);
 
-      return new ObjectAndMaxId<>(maxFieldId, structType);
+      return new TypeAndMaxId(nestedFieldId - 1, structType);
     }
 
-    return new ObjectAndMaxId<>(fieldId, Types.StringType.get());
-  }
-
-  private static ObjectAndMaxId<Types.NestedField> beamFieldToIcebergField(
-      int fieldId, final Schema.Field field) {
-    ObjectAndMaxId<Type> typeAndMaxId = 
beamFieldTypeToIcebergFieldType(fieldId, field.getType());
-    Type icebergType = typeAndMaxId.object;
-    int id = typeAndMaxId.maxId;
-
-    Types.NestedField icebergField =
-        Types.NestedField.of(fieldId, field.getType().getNullable(), 
field.getName(), icebergType);
-
-    return new ObjectAndMaxId<>(id, icebergField);
+    return new TypeAndMaxId(nestedFieldId, Types.StringType.get());
   }
 
   /**
@@ -233,18 +231,23 @@ public class IcebergUtils {
    * <li>{@link Schema.TypeName.LOGICAL_TYPE}
    */
   public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final 
Schema schema) {
-    Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()];
-    int nextIcebergFieldId = 1;
-    for (int i = 0; i < schema.getFieldCount(); i++) {
-      Schema.Field beamField = schema.getField(i);
-      ObjectAndMaxId<Types.NestedField> fieldAndMaxId =
-          beamFieldToIcebergField(nextIcebergFieldId, beamField);
-      Types.NestedField field = fieldAndMaxId.object;
-      fields[i] = field;
-
-      nextIcebergFieldId = fieldAndMaxId.maxId + 1;
+    List<Types.NestedField> fields = new ArrayList<>(schema.getFieldCount());
+    int nestedFieldId = schema.getFieldCount() + 1;
+    int icebergFieldId = 1;
+    for (Schema.Field beamField : schema.getFields()) {
+      TypeAndMaxId typeAndMaxId =
+          beamFieldTypeToIcebergFieldType(beamField.getType(), nestedFieldId);
+      Types.NestedField icebergField =
+          Types.NestedField.of(
+              icebergFieldId++,
+              beamField.getType().getNullable(),
+              beamField.getName(),
+              typeAndMaxId.type);
+
+      fields.add(icebergField);
+      nestedFieldId = typeAndMaxId.maxId + 1;
     }
-    return new org.apache.iceberg.Schema(fields);
+    return new org.apache.iceberg.Schema(fields.toArray(new 
Types.NestedField[fields.size()]));
   }
 
   /** Converts a Beam {@link Row} to an Iceberg {@link Record}. */
@@ -323,27 +326,21 @@ public class IcebergUtils {
   public static Row icebergRecordToBeamRow(Schema schema, Record record) {
     Row.Builder rowBuilder = Row.withSchema(schema);
     for (Schema.Field field : schema.getFields()) {
+      boolean isNullable = field.getType().getNullable();
+      @Nullable Object icebergValue = record.getField(field.getName());
+      if (icebergValue == null) {
+        if (isNullable) {
+          rowBuilder.addValue(null);
+          continue;
+        }
+        throw new RuntimeException(
+            String.format("Received null value for required field '%s'.", 
field.getName()));
+      }
       switch (field.getType().getTypeName()) {
         case BYTE:
-          // I guess allow anything we can cast here
-          byte byteValue = (byte) record.getField(field.getName());
-          rowBuilder.addValue(byteValue);
-          break;
         case INT16:
-          // I guess allow anything we can cast here
-          short shortValue = (short) record.getField(field.getName());
-          rowBuilder.addValue(shortValue);
-          break;
         case INT32:
-          // I guess allow anything we can cast here
-          int intValue = (int) record.getField(field.getName());
-          rowBuilder.addValue(intValue);
-          break;
         case INT64:
-          // I guess allow anything we can cast here
-          long longValue = (long) record.getField(field.getName());
-          rowBuilder.addValue(longValue);
-          break;
         case DECIMAL: // Iceberg and Beam both use BigDecimal
         case FLOAT: // Iceberg and Beam both use float
         case DOUBLE: // Iceberg and Beam both use double
@@ -352,29 +349,31 @@ public class IcebergUtils {
         case ARRAY:
         case ITERABLE:
         case MAP:
-          rowBuilder.addValue(record.getField(field.getName()));
+          rowBuilder.addValue(icebergValue);
           break;
         case DATETIME:
           // Iceberg uses a long for millis; Beam uses joda time DateTime
-          long millis = (long) record.getField(field.getName());
+          long millis = (long) icebergValue;
           rowBuilder.addValue(new DateTime(millis, DateTimeZone.UTC));
           break;
         case BYTES:
           // Iceberg uses ByteBuffer; Beam uses byte[]
-          rowBuilder.addValue(((ByteBuffer) 
record.getField(field.getName())).array());
+          rowBuilder.addValue(((ByteBuffer) icebergValue).array());
           break;
         case ROW:
-          Record nestedRecord = (Record) record.getField(field.getName());
+          Record nestedRecord = (Record) icebergValue;
           Schema nestedSchema =
               checkArgumentNotNull(
                   field.getType().getRowSchema(),
                   "Corrupted schema: Row type did not have associated nested 
schema.");
-          Row nestedRow = icebergRecordToBeamRow(nestedSchema, nestedRecord);
-          rowBuilder.addValue(nestedRow);
+          rowBuilder.addValue(icebergRecordToBeamRow(nestedSchema, 
nestedRecord));
           break;
         case LOGICAL_TYPE:
           throw new UnsupportedOperationException(
               "Cannot convert iceberg field to Beam logical type");
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported Beam type: " + field.getType().getTypeName());
       }
     }
     return rowBuilder.build();
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
index c4da0b22f4d..a20d5b7c8f5 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
-import static org.apache.beam.sdk.io.iceberg.IcebergUtils.ObjectAndMaxId;
+import static org.apache.beam.sdk.io.iceberg.IcebergUtils.TypeAndMaxId;
 import static 
org.apache.beam.sdk.io.iceberg.IcebergUtils.beamFieldTypeToIcebergFieldType;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
@@ -316,11 +316,11 @@ public class IcebergUtilsTest {
 
     private void checkTypes(List<BeamFieldTypeTestCase> testCases) {
       for (BeamFieldTypeTestCase testCase : testCases) {
-        ObjectAndMaxId<Type> ret =
-            beamFieldTypeToIcebergFieldType(testCase.icebergFieldId, 
testCase.beamType);
+        TypeAndMaxId ret =
+            beamFieldTypeToIcebergFieldType(testCase.beamType, 
testCase.icebergFieldId);
 
         assertEquals(testCase.expectedMaxId, ret.maxId);
-        checkEquals(testCase.expectedIcebergType, ret.object);
+        checkEquals(testCase.expectedIcebergType, ret.type);
       }
     }
 
@@ -338,65 +338,65 @@ public class IcebergUtilsTest {
 
     @Test
     public void testPrimitiveBeamFieldTypeToIcebergFieldType() {
+      // primitive types don't use the nested field ID
       List<BeamFieldTypeTestCase> primitives =
           Arrays.asList(
-              new BeamFieldTypeTestCase(1, Schema.FieldType.BOOLEAN, 1, 
Types.BooleanType.get()),
-              new BeamFieldTypeTestCase(3, Schema.FieldType.INT32, 3, 
Types.IntegerType.get()),
-              new BeamFieldTypeTestCase(6, Schema.FieldType.INT64, 6, 
Types.LongType.get()),
-              new BeamFieldTypeTestCase(10, Schema.FieldType.FLOAT, 10, 
Types.FloatType.get()),
-              new BeamFieldTypeTestCase(7, Schema.FieldType.DOUBLE, 7, 
Types.DoubleType.get()),
-              new BeamFieldTypeTestCase(11, Schema.FieldType.STRING, 11, 
Types.StringType.get()),
-              new BeamFieldTypeTestCase(15, Schema.FieldType.BYTES, 15, 
Types.BinaryType.get()));
+              new BeamFieldTypeTestCase(1, Schema.FieldType.BOOLEAN, 0, 
Types.BooleanType.get()),
+              new BeamFieldTypeTestCase(3, Schema.FieldType.INT32, 2, 
Types.IntegerType.get()),
+              new BeamFieldTypeTestCase(6, Schema.FieldType.INT64, 5, 
Types.LongType.get()),
+              new BeamFieldTypeTestCase(10, Schema.FieldType.FLOAT, 9, 
Types.FloatType.get()),
+              new BeamFieldTypeTestCase(7, Schema.FieldType.DOUBLE, 6, 
Types.DoubleType.get()),
+              new BeamFieldTypeTestCase(11, Schema.FieldType.STRING, 10, 
Types.StringType.get()),
+              new BeamFieldTypeTestCase(15, Schema.FieldType.BYTES, 14, 
Types.BinaryType.get()));
 
       checkTypes(primitives);
     }
 
     @Test
     public void testArrayBeamFieldTypeToIcebergFieldType() {
-      // Iceberg sets one field ID for the List type itself and another field 
ID for the collection
-      // type.
+      // Iceberg's ListType reserves one nested ID for its element type
       List<BeamFieldTypeTestCase> listTypes =
           Arrays.asList(
               new BeamFieldTypeTestCase(
                   1,
                   Schema.FieldType.array(Schema.FieldType.BOOLEAN),
-                  2,
+                  1,
                   Types.ListType.ofRequired(1, Types.BooleanType.get())),
               new BeamFieldTypeTestCase(
                   3,
                   Schema.FieldType.iterable(Schema.FieldType.INT32),
-                  4,
+                  3,
                   Types.ListType.ofRequired(3, Types.IntegerType.get())),
               new BeamFieldTypeTestCase(
                   6,
                   Schema.FieldType.array(Schema.FieldType.INT64),
-                  7,
+                  6,
                   Types.ListType.ofRequired(6, Types.LongType.get())),
               new BeamFieldTypeTestCase(
                   10,
                   Schema.FieldType.array(Schema.FieldType.FLOAT),
-                  11,
+                  10,
                   Types.ListType.ofRequired(10, Types.FloatType.get())),
               new BeamFieldTypeTestCase(
                   7,
                   Schema.FieldType.iterable(Schema.FieldType.DOUBLE),
-                  8,
+                  7,
                   Types.ListType.ofRequired(7, Types.DoubleType.get())),
               new BeamFieldTypeTestCase(
                   11,
                   Schema.FieldType.array(Schema.FieldType.STRING),
-                  12,
+                  11,
                   Types.ListType.ofRequired(11, Types.StringType.get())),
               new BeamFieldTypeTestCase(
                   15,
                   Schema.FieldType.iterable(Schema.FieldType.BYTES),
-                  16,
+                  15,
                   Types.ListType.ofRequired(15, Types.BinaryType.get())),
               new BeamFieldTypeTestCase(
                   23,
                   Schema.FieldType.array(
                       
Schema.FieldType.array(Schema.FieldType.iterable(Schema.FieldType.STRING))),
-                  26,
+                  25,
                   Types.ListType.ofRequired(
                       23,
                       Types.ListType.ofRequired(
@@ -407,23 +407,23 @@ public class IcebergUtilsTest {
 
     @Test
     public void testStructBeamFieldTypeToIcebergFieldType() {
-      // Iceberg sets one field ID for each nested type.
+      // Iceberg sets one unique field ID for each nested type.
       List<BeamFieldTypeTestCase> listTypes =
           Arrays.asList(
               new BeamFieldTypeTestCase(
                   1,
                   
Schema.FieldType.row(Schema.builder().addStringField("str").build()),
-                  2,
+                  1,
                   Types.StructType.of(
-                      Types.NestedField.required(2, "str", 
Types.StringType.get()))),
+                      Types.NestedField.required(1, "str", 
Types.StringType.get()))),
               new BeamFieldTypeTestCase(
                   3,
                   
Schema.FieldType.row(Schema.builder().addInt32Field("int").build()),
-                  4,
+                  3,
                   Types.StructType.of(
-                      Types.NestedField.required(4, "int", 
Types.IntegerType.get()))),
+                      Types.NestedField.required(3, "int", 
Types.IntegerType.get()))),
               new BeamFieldTypeTestCase(
-                  0,
+                  1,
                   Schema.FieldType.row(BEAM_SCHEMA_PRIMITIVE),
                   7,
                   Types.StructType.of(ICEBERG_SCHEMA_PRIMITIVE.columns())),
@@ -434,11 +434,11 @@ public class IcebergUtilsTest {
                           .addArrayField("arr", Schema.FieldType.STRING)
                           .addNullableStringField("str")
                           .build()),
-                  18,
+                  17,
                   Types.StructType.of(
                       Types.NestedField.required(
-                          16, "arr", Types.ListType.ofRequired(17, 
Types.StringType.get())),
-                      Types.NestedField.optional(18, "str", 
Types.StringType.get()))),
+                          15, "arr", Types.ListType.ofRequired(17, 
Types.StringType.get())),
+                      Types.NestedField.optional(16, "str", 
Types.StringType.get()))),
               new BeamFieldTypeTestCase(
                   20,
                   Schema.FieldType.row(
@@ -452,10 +452,10 @@ public class IcebergUtilsTest {
                           .addNullableRowField(
                               "nullable_row", 
Schema.builder().addInt64Field("long").build())
                           .build()),
-                  25,
+                  24,
                   Types.StructType.of(
                       Types.NestedField.required(
-                          21,
+                          20,
                           "row",
                           Types.StructType.of(
                               Types.NestedField.required(
@@ -465,33 +465,34 @@ public class IcebergUtilsTest {
                                       Types.NestedField.required(
                                           23, "str", 
Types.StringType.get()))))),
                       Types.NestedField.optional(
-                          24,
+                          21,
                           "nullable_row",
                           Types.StructType.of(
-                              Types.NestedField.required(25, "long", 
Types.LongType.get()))))));
+                              Types.NestedField.required(24, "long", 
Types.LongType.get()))))));
 
       checkTypes(listTypes);
     }
 
     @Test
     public void testMapBeamFieldTypeToIcebergFieldType() {
+      // Iceberg's MapType reserves two nested IDs. one for its key type and 
one for its value type.
       List<BeamFieldTypeTestCase> primitives =
           Arrays.asList(
               new BeamFieldTypeTestCase(
                   1,
                   Schema.FieldType.map(Schema.FieldType.STRING, 
Schema.FieldType.INT32),
-                  3,
-                  Types.MapType.ofRequired(2, 3, Types.StringType.get(), 
Types.IntegerType.get())),
+                  2,
+                  Types.MapType.ofRequired(1, 2, Types.StringType.get(), 
Types.IntegerType.get())),
               new BeamFieldTypeTestCase(
                   6,
                   Schema.FieldType.map(
                       Schema.FieldType.FLOAT, 
Schema.FieldType.array(Schema.FieldType.STRING)),
-                  9,
+                  8,
                   Types.MapType.ofRequired(
+                      6,
                       7,
-                      8,
                       Types.FloatType.get(),
-                      Types.ListType.ofRequired(9, Types.StringType.get()))),
+                      Types.ListType.ofRequired(8, Types.StringType.get()))),
               new BeamFieldTypeTestCase(
                   10,
                   Schema.FieldType.map(
@@ -499,30 +500,30 @@ public class IcebergUtilsTest {
                       Schema.FieldType.map(
                           Schema.FieldType.BOOLEAN,
                           Schema.FieldType.map(Schema.FieldType.STRING, 
Schema.FieldType.INT32))),
-                  16,
+                  15,
                   Types.MapType.ofRequired(
+                      10,
                       11,
-                      12,
                       Types.StringType.get(),
                       Types.MapType.ofRequired(
+                          12,
                           13,
-                          14,
                           Types.BooleanType.get(),
                           Types.MapType.ofRequired(
-                              15, 16, Types.StringType.get(), 
Types.IntegerType.get())))),
+                              14, 15, Types.StringType.get(), 
Types.IntegerType.get())))),
               new BeamFieldTypeTestCase(
                   15,
                   Schema.FieldType.map(
                       
Schema.FieldType.row(Schema.builder().addStringField("str").build()),
                       
Schema.FieldType.row(Schema.builder().addInt32Field("int").build())),
-                  19,
+                  18,
                   Types.MapType.ofRequired(
+                      15,
                       16,
-                      17,
                       Types.StructType.of(
-                          Types.NestedField.required(18, "str", 
Types.StringType.get())),
+                          Types.NestedField.required(17, "str", 
Types.StringType.get())),
                       Types.StructType.of(
-                          Types.NestedField.required(19, "int", 
Types.IntegerType.get())))));
+                          Types.NestedField.required(18, "int", 
Types.IntegerType.get())))));
 
       checkTypes(primitives);
     }
@@ -574,9 +575,9 @@ public class IcebergUtilsTest {
             .build();
     static final org.apache.iceberg.Schema ICEBERG_SCHEMA_LIST =
         new org.apache.iceberg.Schema(
-            required(1, "arr_str", Types.ListType.ofRequired(2, 
Types.StringType.get())),
-            required(3, "arr_int", Types.ListType.ofRequired(4, 
Types.IntegerType.get())),
-            required(5, "arr_bool", Types.ListType.ofRequired(6, 
Types.BooleanType.get())));
+            required(1, "arr_str", Types.ListType.ofRequired(4, 
Types.StringType.get())),
+            required(2, "arr_int", Types.ListType.ofRequired(5, 
Types.IntegerType.get())),
+            required(3, "arr_bool", Types.ListType.ofRequired(6, 
Types.BooleanType.get())));
 
     @Test
     public void testArrayBeamSchemaToIcebergSchema() {
@@ -607,9 +608,9 @@ public class IcebergUtilsTest {
             required(
                 1,
                 "str_int",
-                Types.MapType.ofRequired(2, 3, Types.StringType.get(), 
Types.IntegerType.get())),
+                Types.MapType.ofRequired(3, 4, Types.StringType.get(), 
Types.IntegerType.get())),
             optional(
-                4,
+                2,
                 "long_bool",
                 Types.MapType.ofRequired(5, 6, Types.LongType.get(), 
Types.BooleanType.get())));
 
@@ -648,11 +649,11 @@ public class IcebergUtilsTest {
                 1,
                 "row",
                 Types.StructType.of(
-                    required(2, "str", Types.StringType.get()),
-                    optional(3, "int", Types.IntegerType.get()),
-                    required(4, "long", Types.LongType.get()))),
+                    required(3, "str", Types.StringType.get()),
+                    optional(4, "int", Types.IntegerType.get()),
+                    required(5, "long", Types.LongType.get()))),
             optional(
-                5,
+                2,
                 "nullable_row",
                 Types.StructType.of(
                     optional(6, "str", Types.StringType.get()),

Reply via email to