Abacn commented on code in PR #31958:
URL: https://github.com/apache/beam/pull/31958#discussion_r1690165473
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java:
##########
@@ -20,36 +20,38 @@
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
-class SchemaAndRowConversions {
+public class IcebergUtils {
Review Comment:
Let's put a brief javadoc for public class.
Also good to have a comment (not java doc) for the criteria what kind of
util method is intended to be exposed. One of the context is that XXIOUtils
class has inconsistent visibility settings throughout Beam (e.g.
BigQueryIOUtils is public, JdbcIOUtils does not)
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java:
##########
@@ -99,36 +101,128 @@ public static Schema icebergSchemaToBeamSchema(final
org.apache.iceberg.Schema s
return builder.build();
}
- public static Schema icebergStructTypeToBeamSchema(final Types.StructType
struct) {
+ private static Schema icebergStructTypeToBeamSchema(final Types.StructType
struct) {
Schema.Builder builder = Schema.builder();
for (Types.NestedField f : struct.fields()) {
builder.addField(icebergFieldToBeamField(f));
}
return builder.build();
}
- public static Types.NestedField beamFieldToIcebergField(int fieldId, final
Schema.Field field) {
- @Nullable Type icebergType =
BEAM_TYPES_TO_ICEBERG_TYPES.get(field.getType());
+ /**
+ * Represents an Object (in practice, either {@link Type} or {@link
Types.NestedField}) along with
+ * the most recent (max) ID that has been used to build this object.
+ *
+ * <p>Iceberg Schema fields are required to have unique IDs. This includes
unique IDs for a {@link
+ * Types.ListType}'s collection type, a {@link Types.MapType}'s key type and
value type, and
+ * nested {@link Types.StructType}s. When constructing any of these types,
we use multiple unique
+ * ID's for the type's components. The {@code maxId} in this object
represents the most recent ID
+ * used after building this type. This helps signal that the next field we
construct should have
+ * an ID greater than this one.
+ */
+ private static class ObjectAndMaxId<T> {
+ int maxId;
+ T object;
- if (icebergType != null) {
- return Types.NestedField.of(
- fieldId, field.getType().getNullable(), field.getName(),
icebergType);
- } else {
- return Types.NestedField.of(
- fieldId, field.getType().getNullable(), field.getName(),
Types.StringType.get());
+ ObjectAndMaxId(int id, T object) {
+ this.maxId = id;
+ this.object = object;
}
}
+ private static ObjectAndMaxId<Type> beamFieldTypeToIcebergFieldType(
+ int fieldId, Schema.FieldType beamType) {
+ if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
+ return new ObjectAndMaxId<>(fieldId,
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
+ } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or
ITERABLE
+ // List ID needs to be unique from the NestedField that contains this
ListType
+ int listId = fieldId + 1;
Review Comment:
is this because iceberg listId index starts from 1? Good to have a comment
here
this conversion is seen elsewhere, maybe put `int listId = fieldId + 1;` at
the beginning of this method
0-indexed and 1-indexed systems conversion is a well known annoying and
error-prone thing
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java:
##########
@@ -99,36 +101,128 @@ public static Schema icebergSchemaToBeamSchema(final
org.apache.iceberg.Schema s
return builder.build();
}
- public static Schema icebergStructTypeToBeamSchema(final Types.StructType
struct) {
+ private static Schema icebergStructTypeToBeamSchema(final Types.StructType
struct) {
Schema.Builder builder = Schema.builder();
for (Types.NestedField f : struct.fields()) {
builder.addField(icebergFieldToBeamField(f));
}
return builder.build();
}
- public static Types.NestedField beamFieldToIcebergField(int fieldId, final
Schema.Field field) {
- @Nullable Type icebergType =
BEAM_TYPES_TO_ICEBERG_TYPES.get(field.getType());
+ /**
+ * Represents an Object (in practice, either {@link Type} or {@link
Types.NestedField}) along with
+ * the most recent (max) ID that has been used to build this object.
+ *
+ * <p>Iceberg Schema fields are required to have unique IDs. This includes
unique IDs for a {@link
+ * Types.ListType}'s collection type, a {@link Types.MapType}'s key type and
value type, and
+ * nested {@link Types.StructType}s. When constructing any of these types,
we use multiple unique
+ * ID's for the type's components. The {@code maxId} in this object
represents the most recent ID
+ * used after building this type. This helps signal that the next field we
construct should have
+ * an ID greater than this one.
+ */
+ private static class ObjectAndMaxId<T> {
+ int maxId;
+ T object;
- if (icebergType != null) {
- return Types.NestedField.of(
- fieldId, field.getType().getNullable(), field.getName(),
icebergType);
- } else {
- return Types.NestedField.of(
- fieldId, field.getType().getNullable(), field.getName(),
Types.StringType.get());
+ ObjectAndMaxId(int id, T object) {
+ this.maxId = id;
+ this.object = object;
}
}
+ private static ObjectAndMaxId<Type> beamFieldTypeToIcebergFieldType(
+ int fieldId, Schema.FieldType beamType) {
+ if (BEAM_TYPES_TO_ICEBERG_TYPES.containsKey(beamType.getTypeName())) {
+ return new ObjectAndMaxId<>(fieldId,
BEAM_TYPES_TO_ICEBERG_TYPES.get(beamType.getTypeName()));
+ } else if (beamType.getTypeName().isCollectionType()) { // ARRAY or
ITERABLE
+ // List ID needs to be unique from the NestedField that contains this
ListType
+ int listId = fieldId + 1;
+ Schema.FieldType beamCollectionType =
+
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType());
+ Type icebergCollectionType =
+ beamFieldTypeToIcebergFieldType(listId, beamCollectionType).object;
+
+ boolean elementTypeIsNullable =
+
Preconditions.checkArgumentNotNull(beamType.getCollectionElementType()).getNullable();
+
+ Type listType =
+ elementTypeIsNullable
+ ? Types.ListType.ofOptional(listId, icebergCollectionType)
+ : Types.ListType.ofRequired(listId, icebergCollectionType);
+
+ return new ObjectAndMaxId<>(listId, listType);
+ } else if (beamType.getTypeName().isMapType()) { // MAP
+ // key and value IDs need to be unique from the NestedField that
contains this MapType
+ int keyId = fieldId + 1;
+ int valueId = fieldId + 2;
+
+ Schema.FieldType beamKeyType =
Preconditions.checkArgumentNotNull(beamType.getMapKeyType());
+ Schema.FieldType beamValueType =
+ Preconditions.checkArgumentNotNull(beamType.getMapValueType());
+
+ Type icebergKeyType = beamFieldTypeToIcebergFieldType(keyId,
beamKeyType).object;
+ Type icebergValueType = beamFieldTypeToIcebergFieldType(valueId,
beamValueType).object;
+
+ Type mapType =
+ beamValueType.getNullable()
+ ? Types.MapType.ofOptional(keyId, valueId, icebergKeyType,
icebergValueType)
+ : Types.MapType.ofRequired(keyId, valueId, icebergKeyType,
icebergValueType);
+
+ return new ObjectAndMaxId<>(valueId, mapType);
+ } else if (beamType.getTypeName().isCompositeType()) { // ROW
+ // Nested field IDs need to be unique from the field that contains this
StructType
+ int nestedFieldId = fieldId;
+
+ Schema nestedSchema =
Preconditions.checkArgumentNotNull(beamType.getRowSchema());
+ List<Types.NestedField> nestedFields = new
ArrayList<>(nestedSchema.getFieldCount());
+ for (Schema.Field field : nestedSchema.getFields()) {
+ Types.NestedField nestedField =
beamFieldToIcebergField(++nestedFieldId, field).object;
+ nestedFields.add(nestedField);
+ }
+
+ Type structType = Types.StructType.of(nestedFields);
+
+ return new ObjectAndMaxId<>(nestedFieldId, structType);
+ }
+
+ return new ObjectAndMaxId<>(fieldId, Types.StringType.get());
+ }
+
+ private static ObjectAndMaxId<Types.NestedField> beamFieldToIcebergField(
+ int fieldId, final Schema.Field field) {
+ ObjectAndMaxId<Type> typeAndMaxId =
beamFieldTypeToIcebergFieldType(fieldId, field.getType());
+ Type icebergType = typeAndMaxId.object;
+ int id = typeAndMaxId.maxId;
+
+ Types.NestedField icebergField =
+ Types.NestedField.of(fieldId, field.getType().getNullable(),
field.getName(), icebergType);
+
+ return new ObjectAndMaxId<>(id, icebergField);
+ }
+
+ /**
+ * Converts a Beam {@link Schema} to an Iceberg {@link
org.apache.iceberg.Schema}.
+ *
+ * <p>The following unsupported Beam types will be defaulted to {@link
Types.StringType}:
+ * <li>{@link Schema.TypeName.DECIMAL}
+ * <li>{@link Schema.TypeName.DATETIME}
+ * <li>{@link Schema.TypeName.LOGICAL_TYPE}
+ */
public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final
Schema schema) {
Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()];
- int fieldId = 0;
- for (Schema.Field f : schema.getFields()) {
- fields[fieldId++] = beamFieldToIcebergField(fieldId, f);
+ int nextId = 1;
Review Comment:
consider `icebergId` or other naming make it clear that it is iceberg index
(starts from 1), throughout IcevergUtils
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]