ahmedabu98 commented on code in PR #31958:
URL: https://github.com/apache/beam/pull/31958#discussion_r1690449958


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java:
##########
@@ -99,36 +101,128 @@ public static Schema icebergSchemaToBeamSchema(final 
org.apache.iceberg.Schema s
     return builder.build();
   }
 
-  public static Schema icebergStructTypeToBeamSchema(final Types.StructType 
struct) {
+  private static Schema icebergStructTypeToBeamSchema(final Types.StructType 
struct) {
     Schema.Builder builder = Schema.builder();
     for (Types.NestedField f : struct.fields()) {
       builder.addField(icebergFieldToBeamField(f));
     }
     return builder.build();
   }
 
-  public static Types.NestedField beamFieldToIcebergField(int fieldId, final 
Schema.Field field) {
-    @Nullable Type icebergType = 
BEAM_TYPES_TO_ICEBERG_TYPES.get(field.getType());
+  /**
+   * Represents an Object (in practice, either {@link Type} or {@link 
Types.NestedField}) along with
+   * the most recent (max) ID that has been used to build this object.
+   *
+   * <p>Iceberg Schema fields are required to have unique IDs. This includes 
unique IDs for a {@link
+   * Types.ListType}'s collection type, a {@link Types.MapType}'s key type and 
value type, and
+   * nested {@link Types.StructType}s. When constructing any of these types, 
we use multiple unique
+   * ID's for the type's components. The {@code maxId} in this object 
represents the most recent ID
+   * used after building this type. This helps signal that the next field we 
construct should have
+   * an ID greater than this one.
+   */
+  private static class ObjectAndMaxId<T> {
+    int maxId;
+    T object;
 
-    if (icebergType != null) {
-      return Types.NestedField.of(
-          fieldId, field.getType().getNullable(), field.getName(), 
icebergType);
-    } else {
-      return Types.NestedField.of(
-          fieldId, field.getType().getNullable(), field.getName(), 
Types.StringType.get());
+    ObjectAndMaxId(int id, T object) {
+      this.maxId = id;
+      this.object = object;
     }
   }
 
+  private static ObjectAndMaxId<Type> beamFieldTypeToIcebergFieldType(
+      int fieldId, Schema.FieldType beamType) {
+    if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
+      return new ObjectAndMaxId<>(fieldId, 
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
+    } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or 
ITERABLE
+      // List ID needs to be unique from the NestedField that contains this 
ListType
+      int listId = fieldId + 1;
+      Schema.FieldType beamCollectionType =
+          
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
+      Type icebergCollectionType =
+          beamFieldTypeToIcebergFieldType(listId, beamCollectionType).object;
+
+      boolean elementTypeIsNullable =
+          
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable();
+
+      Type listType =
+          elementTypeIsNullable
+              ? Types.ListType.ofOptional(listId, icebergCollectionType)
+              : Types.ListType.ofRequired(listId, icebergCollectionType);
+
+      return new ObjectAndMaxId<>(listId, listType);
+    } else if (beamType.getTypeName().isMapType()) { // MAP
+      // key and value IDs need to be unique from the NestedField that 
contains this MapType
+      int keyId = fieldId + 1;
+      int valueId = fieldId + 2;
+
+      Schema.FieldType beamKeyType = 
Preconditions.checkArgumentNotNull(beamType.getMapKeyType());
+      Schema.FieldType beamValueType =
+          Preconditions.checkArgumentNotNull(beamType.getMapValueType());
+
+      Type icebergKeyType = beamFieldTypeToIcebergFieldType(keyId, 
beamKeyType).object;
+      Type icebergValueType = beamFieldTypeToIcebergFieldType(valueId, 
beamValueType).object;
+
+      Type mapType =
+          beamValueType.getNullable()
+              ? Types.MapType.ofOptional(keyId, valueId, icebergKeyType, 
icebergValueType)
+              : Types.MapType.ofRequired(keyId, valueId, icebergKeyType, 
icebergValueType);
+
+      return new ObjectAndMaxId<>(valueId, mapType);
+    } else if (beamType.getTypeName().isCompositeType()) { // ROW
+      // Nested field IDs need to be unique from the field that contains this 
StructType
+      int nestedFieldId = fieldId;
+
+      Schema nestedSchema = 
Preconditions.checkArgumentNotNull(beamType.getRowSchema());
+      List<Types.NestedField> nestedFields = new 
ArrayList<>(nestedSchema.getFieldCount());
+      for (Schema.Field field : nestedSchema.getFields()) {
+        Types.NestedField nestedField = 
beamFieldToIcebergField(++nestedFieldId, field).object;
+        nestedFields.add(nestedField);
+      }
+
+      Type structType = Types.StructType.of(nestedFields);
+
+      return new ObjectAndMaxId<>(nestedFieldId, structType);
+    }
+
+    return new ObjectAndMaxId<>(fieldId, Types.StringType.get());
+  }
+
+  private static ObjectAndMaxId<Types.NestedField> beamFieldToIcebergField(
+      int fieldId, final Schema.Field field) {
+    ObjectAndMaxId<Type> typeAndMaxId = 
beamFieldTypeToIcebergFieldType(fieldId, field.getType());
+    Type icebergType = typeAndMaxId.object;
+    int id = typeAndMaxId.maxId;
+
+    Types.NestedField icebergField =
+        Types.NestedField.of(fieldId, field.getType().getNullable(), 
field.getName(), icebergType);
+
+    return new ObjectAndMaxId<>(id, icebergField);
+  }
+
+  /**
+   * Converts a Beam {@link Schema} to an Iceberg {@link 
org.apache.iceberg.Schema}.
+   *
+   * <p>The following unsupported Beam types will be defaulted to {@link 
Types.StringType}:
+   * <li>{@link Schema.TypeName.DECIMAL}
+   * <li>{@link Schema.TypeName.DATETIME}
+   * <li>{@link Schema.TypeName.LOGICAL_TYPE}
+   */
   public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final 
Schema schema) {
     Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()];
-    int fieldId = 0;
-    for (Schema.Field f : schema.getFields()) {
-      fields[fieldId++] = beamFieldToIcebergField(fieldId, f);
+    int nextId = 1;

Review Comment:
   Thanks, changed it to `nextIcebergFieldId`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to