ahmedabu98 commented on code in PR #31958:
URL: https://github.com/apache/beam/pull/31958#discussion_r1690481326
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java:
##########
@@ -99,36 +101,128 @@ public static Schema icebergSchemaToBeamSchema(final
org.apache.iceberg.Schema s
return builder.build();
}
- public static Schema icebergStructTypeToBeamSchema(final Types.StructType
struct) {
+ private static Schema icebergStructTypeToBeamSchema(final Types.StructType
struct) {
Schema.Builder builder = Schema.builder();
for (Types.NestedField f : struct.fields()) {
builder.addField(icebergFieldToBeamField(f));
}
return builder.build();
}
- public static Types.NestedField beamFieldToIcebergField(int fieldId, final
Schema.Field field) {
- @Nullable Type icebergType =
BEAM_TYPES_TO_ICEBERG_TYPES.get(field.getType());
+ /**
+ * Represents an Object (in practice, either {@link Type} or {@link
Types.NestedField}) along with
+ * the most recent (max) ID that has been used to build this object.
+ *
+ * <p>Iceberg Schema fields are required to have unique IDs. This includes
unique IDs for a {@link
+ * Types.ListType}'s collection type, a {@link Types.MapType}'s key type and
value type, and
+ * nested {@link Types.StructType}s. When constructing any of these types,
we use multiple unique
+ * ID's for the type's components. The {@code maxId} in this object
represents the most recent ID
+ * used after building this type. This helps signal that the next field we
construct should have
+ * an ID greater than this one.
+ */
+ private static class ObjectAndMaxId<T> {
+ int maxId;
+ T object;
- if (icebergType != null) {
- return Types.NestedField.of(
- fieldId, field.getType().getNullable(), field.getName(),
icebergType);
- } else {
- return Types.NestedField.of(
- fieldId, field.getType().getNullable(), field.getName(),
Types.StringType.get());
+ ObjectAndMaxId(int id, T object) {
+ this.maxId = id;
+ this.object = object;
}
}
+ private static ObjectAndMaxId<Type> beamFieldTypeToIcebergFieldType(
+ int fieldId, Schema.FieldType beamType) {
+ if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
+ return new ObjectAndMaxId<>(fieldId,
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
+ } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or
ITERABLE
+ // List ID needs to be unique from the NestedField that contains this
ListType
+ int listId = fieldId + 1;
+ Schema.FieldType beamCollectionType =
+
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
+ Type icebergCollectionType =
+ beamFieldTypeToIcebergFieldType(listId, beamCollectionType).object;
+
+ boolean elementTypeIsNullable =
+
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable();
+
+ Type listType =
+ elementTypeIsNullable
+ ? Types.ListType.ofOptional(listId, icebergCollectionType)
+ : Types.ListType.ofRequired(listId, icebergCollectionType);
+
+ return new ObjectAndMaxId<>(listId, listType);
+ } else if (beamType.getTypeName().isMapType()) { // MAP
+ // key and value IDs need to be unique from the NestedField that
contains this MapType
+ int keyId = fieldId + 1;
+ int valueId = fieldId + 2;
+
+ Schema.FieldType beamKeyType =
Preconditions.checkArgumentNotNull(beamType.getMapKeyType());
+ Schema.FieldType beamValueType =
+ Preconditions.checkArgumentNotNull(beamType.getMapValueType());
+
+ Type icebergKeyType = beamFieldTypeToIcebergFieldType(keyId,
beamKeyType).object;
+ Type icebergValueType = beamFieldTypeToIcebergFieldType(valueId,
beamValueType).object;
+
+ Type mapType =
+ beamValueType.getNullable()
+ ? Types.MapType.ofOptional(keyId, valueId, icebergKeyType,
icebergValueType)
+ : Types.MapType.ofRequired(keyId, valueId, icebergKeyType,
icebergValueType);
+
+ return new ObjectAndMaxId<>(valueId, mapType);
+ } else if (beamType.getTypeName().isCompositeType()) { // ROW
+ // Nested field IDs need to be unique from the field that contains this
StructType
+ int nestedFieldId = fieldId;
+
+ Schema nestedSchema =
Preconditions.checkArgumentNotNull(beamType.getRowSchema());
+ List<Types.NestedField> nestedFields = new
ArrayList<>(nestedSchema.getFieldCount());
+ for (Schema.Field field : nestedSchema.getFields()) {
+ Types.NestedField nestedField =
beamFieldToIcebergField(++nestedFieldId, field).object;
+ nestedFields.add(nestedField);
+ }
+
+ Type structType = Types.StructType.of(nestedFields);
+
+ return new ObjectAndMaxId<>(nestedFieldId, structType);
+ }
+
+ return new ObjectAndMaxId<>(fieldId, Types.StringType.get());
+ }
+
+ private static ObjectAndMaxId<Types.NestedField> beamFieldToIcebergField(
+ int fieldId, final Schema.Field field) {
+ ObjectAndMaxId<Type> typeAndMaxId =
beamFieldTypeToIcebergFieldType(fieldId, field.getType());
+ Type icebergType = typeAndMaxId.object;
+ int id = typeAndMaxId.maxId;
+
+ Types.NestedField icebergField =
+ Types.NestedField.of(fieldId, field.getType().getNullable(),
field.getName(), icebergType);
+
+ return new ObjectAndMaxId<>(id, icebergField);
+ }
+
+ /**
+ * Converts a Beam {@link Schema} to an Iceberg {@link
org.apache.iceberg.Schema}.
+ *
+ * <p>The following unsupported Beam types will be defaulted to {@link
Types.StringType}:
+ * <li>{@link Schema.TypeName.DECIMAL}
+ * <li>{@link Schema.TypeName.DATETIME}
+ * <li>{@link Schema.TypeName.LOGICAL_TYPE}
+ */
public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final
Schema schema) {
Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()];
- int fieldId = 0;
- for (Schema.Field f : schema.getFields()) {
- fields[fieldId++] = beamFieldToIcebergField(fieldId, f);
+ int nextId = 1;
Review Comment:
P.S. I made `nextIcebergFieldId` start from 1 here because I saw many
examples online of Iceberg Schema fields starting from index 1. It's def not a
rule but appears to be a convention
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]