Abacn commented on code in PR #31958:
URL: https://github.com/apache/beam/pull/31958#discussion_r1690165473


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java:
##########
@@ -20,36 +20,38 @@
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
-import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
-class SchemaAndRowConversions {
+public class IcebergUtils {

Review Comment:
   Let's put a brief javadoc for public class.
   
   Also good to have a comment (not java doc) for the criteria what kind of 
util method is intended to be exposed. One of the context is that XXIOUtils 
class has inconsistent visibility settings throughout Beam (e.g. 
BigQueryIOUtils is public, JdbcIOUtils does not)



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java:
##########
@@ -99,36 +101,128 @@ public static Schema icebergSchemaToBeamSchema(final 
org.apache.iceberg.Schema s
     return builder.build();
   }
 
-  public static Schema icebergStructTypeToBeamSchema(final Types.StructType 
struct) {
+  private static Schema icebergStructTypeToBeamSchema(final Types.StructType 
struct) {
     Schema.Builder builder = Schema.builder();
     for (Types.NestedField f : struct.fields()) {
       builder.addField(icebergFieldToBeamField(f));
     }
     return builder.build();
   }
 
-  public static Types.NestedField beamFieldToIcebergField(int fieldId, final 
Schema.Field field) {
-    @Nullable Type icebergType = 
BEAM_TYPES_TO_ICEBERG_TYPES.get(field.getType());
+  /**
+   * Represents an Object (in practice, either {@link Type} or {@link 
Types.NestedField}) along with
+   * the most recent (max) ID that has been used to build this object.
+   *
+   * <p>Iceberg Schema fields are required to have unique IDs. This includes 
unique IDs for a {@link
+   * Types.ListType}'s collection type, a {@link Types.MapType}'s key type and 
value type, and
+   * nested {@link Types.StructType}s. When constructing any of these types, 
we use multiple unique
+   * ID's for the type's components. The {@code maxId} in this object 
represents the most recent ID
+   * used after building this type. This helps signal that the next field we 
construct should have
+   * an ID greater than this one.
+   */
+  private static class ObjectAndMaxId<T> {
+    int maxId;
+    T object;
 
-    if (icebergType != null) {
-      return Types.NestedField.of(
-          fieldId, field.getType().getNullable(), field.getName(), 
icebergType);
-    } else {
-      return Types.NestedField.of(
-          fieldId, field.getType().getNullable(), field.getName(), 
Types.StringType.get());
+    ObjectAndMaxId(int id, T object) {
+      this.maxId = id;
+      this.object = object;
     }
   }
 
+  private static ObjectAndMaxId<Type> beamFieldTypeToIcebergFieldType(
+      int fieldId, Schema.FieldType beamType) {
+    if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
+      return new ObjectAndMaxId<>(fieldId, 
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
+    } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or 
ITERABLE
+      // List ID needs to be unique from the NestedField that contains this 
ListType
+      int listId = fieldId + 1;

Review Comment:
   is this because iceberg listId index starts from 1? Good to have a comment 
here
   
   this conversion is seen elsewhere, maybe put `int listId = fieldId + 1;` at 
the beginning of this method
   
   0-indexed and 1-indexed systems conversion is a well known annoying and 
error-prone thing



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java:
##########
@@ -99,36 +101,128 @@ public static Schema icebergSchemaToBeamSchema(final 
org.apache.iceberg.Schema s
     return builder.build();
   }
 
-  public static Schema icebergStructTypeToBeamSchema(final Types.StructType 
struct) {
+  private static Schema icebergStructTypeToBeamSchema(final Types.StructType 
struct) {
     Schema.Builder builder = Schema.builder();
     for (Types.NestedField f : struct.fields()) {
       builder.addField(icebergFieldToBeamField(f));
     }
     return builder.build();
   }
 
-  public static Types.NestedField beamFieldToIcebergField(int fieldId, final 
Schema.Field field) {
-    @Nullable Type icebergType = 
BEAM_TYPES_TO_ICEBERG_TYPES.get(field.getType());
+  /**
+   * Represents an Object (in practice, either {@link Type} or {@link 
Types.NestedField}) along with
+   * the most recent (max) ID that has been used to build this object.
+   *
+   * <p>Iceberg Schema fields are required to have unique IDs. This includes 
unique IDs for a {@link
+   * Types.ListType}'s collection type, a {@link Types.MapType}'s key type and 
value type, and
+   * nested {@link Types.StructType}s. When constructing any of these types, 
we use multiple unique
+   * ID's for the type's components. The {@code maxId} in this object 
represents the most recent ID
+   * used after building this type. This helps signal that the next field we 
construct should have
+   * an ID greater than this one.
+   */
+  private static class ObjectAndMaxId<T> {
+    int maxId;
+    T object;
 
-    if (icebergType != null) {
-      return Types.NestedField.of(
-          fieldId, field.getType().getNullable(), field.getName(), 
icebergType);
-    } else {
-      return Types.NestedField.of(
-          fieldId, field.getType().getNullable(), field.getName(), 
Types.StringType.get());
+    ObjectAndMaxId(int id, T object) {
+      this.maxId = id;
+      this.object = object;
     }
   }
 
+  private static ObjectAndMaxId<Type> beamFieldTypeToIcebergFieldType(
+      int fieldId, Schema.FieldType beamType) {
+    if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
+      return new ObjectAndMaxId<>(fieldId, 
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
+    } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or 
ITERABLE
+      // List ID needs to be unique from the NestedField that contains this 
ListType
+      int listId = fieldId + 1;
+      Schema.FieldType beamCollectionType =
+          
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
+      Type icebergCollectionType =
+          beamFieldTypeToIcebergFieldType(listId, beamCollectionType).object;
+
+      boolean elementTypeIsNullable =
+          
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable();
+
+      Type listType =
+          elementTypeIsNullable
+              ? Types.ListType.ofOptional(listId, icebergCollectionType)
+              : Types.ListType.ofRequired(listId, icebergCollectionType);
+
+      return new ObjectAndMaxId<>(listId, listType);
+    } else if (beamType.getTypeName().isMapType()) { // MAP
+      // key and value IDs need to be unique from the NestedField that 
contains this MapType
+      int keyId = fieldId + 1;
+      int valueId = fieldId + 2;
+
+      Schema.FieldType beamKeyType = 
Preconditions.checkArgumentNotNull(beamType.getMapKeyType());
+      Schema.FieldType beamValueType =
+          Preconditions.checkArgumentNotNull(beamType.getMapValueType());
+
+      Type icebergKeyType = beamFieldTypeToIcebergFieldType(keyId, 
beamKeyType).object;
+      Type icebergValueType = beamFieldTypeToIcebergFieldType(valueId, 
beamValueType).object;
+
+      Type mapType =
+          beamValueType.getNullable()
+              ? Types.MapType.ofOptional(keyId, valueId, icebergKeyType, 
icebergValueType)
+              : Types.MapType.ofRequired(keyId, valueId, icebergKeyType, 
icebergValueType);
+
+      return new ObjectAndMaxId<>(valueId, mapType);
+    } else if (beamType.getTypeName().isCompositeType()) { // ROW
+      // Nested field IDs need to be unique from the field that contains this 
StructType
+      int nestedFieldId = fieldId;
+
+      Schema nestedSchema = 
Preconditions.checkArgumentNotNull(beamType.getRowSchema());
+      List<Types.NestedField> nestedFields = new 
ArrayList<>(nestedSchema.getFieldCount());
+      for (Schema.Field field : nestedSchema.getFields()) {
+        Types.NestedField nestedField = 
beamFieldToIcebergField(++nestedFieldId, field).object;
+        nestedFields.add(nestedField);
+      }
+
+      Type structType = Types.StructType.of(nestedFields);
+
+      return new ObjectAndMaxId<>(nestedFieldId, structType);
+    }
+
+    return new ObjectAndMaxId<>(fieldId, Types.StringType.get());
+  }
+
+  private static ObjectAndMaxId<Types.NestedField> beamFieldToIcebergField(
+      int fieldId, final Schema.Field field) {
+    ObjectAndMaxId<Type> typeAndMaxId = 
beamFieldTypeToIcebergFieldType(fieldId, field.getType());
+    Type icebergType = typeAndMaxId.object;
+    int id = typeAndMaxId.maxId;
+
+    Types.NestedField icebergField =
+        Types.NestedField.of(fieldId, field.getType().getNullable(), 
field.getName(), icebergType);
+
+    return new ObjectAndMaxId<>(id, icebergField);
+  }
+
+  /**
+   * Converts a Beam {@link Schema} to an Iceberg {@link 
org.apache.iceberg.Schema}.
+   *
+   * <p>The following unsupported Beam types will be defaulted to {@link 
Types.StringType}:
+   * <li>{@link Schema.TypeName.DECIMAL}
+   * <li>{@link Schema.TypeName.DATETIME}
+   * <li>{@link Schema.TypeName.LOGICAL_TYPE}
+   */
   public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final 
Schema schema) {
     Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()];
-    int fieldId = 0;
-    for (Schema.Field f : schema.getFields()) {
-      fields[fieldId++] = beamFieldToIcebergField(fieldId, f);
+    int nextId = 1;

Review Comment:
   consider `icebergId` or other naming make it clear that it is iceberg index 
(starts from 1), throughout IcevergUtils



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to