kennknowles commented on code in PR #30805:
URL: https://github.com/apache/beam/pull/30805#discussion_r1559800981


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaAndRowConversions.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+class SchemaAndRowConversions {
+
+  private SchemaAndRowConversions() {}
+
+  public static final String ICEBERG_TYPE_OPTION_NAME = "icebergTypeID";
+
+  public static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        return Schema.FieldType.BOOLEAN;
+      case INTEGER:
+        return Schema.FieldType.INT32;
+      case LONG:
+        return Schema.FieldType.INT64;
+      case FLOAT:
+        return Schema.FieldType.FLOAT;
+      case DOUBLE:
+        return Schema.FieldType.DOUBLE;
+      case DATE:
+      case TIME:
+      case TIMESTAMP: // TODO: Logical types?
+        return Schema.FieldType.DATETIME;
+      case STRING:
+        return Schema.FieldType.STRING;
+      case UUID:
+      case BINARY:
+        return Schema.FieldType.BYTES;
+      case FIXED:
+      case DECIMAL:
+        return Schema.FieldType.DECIMAL;
+      case STRUCT:
+        return 
Schema.FieldType.row(icebergStructTypeToBeamSchema(type.asStructType()));
+      case LIST:
+        return Schema.FieldType.iterable(
+            icebergTypeToBeamFieldType(type.asListType().elementType()));
+      case MAP:
+        return Schema.FieldType.map(
+            icebergTypeToBeamFieldType(type.asMapType().keyType()),
+            icebergTypeToBeamFieldType(type.asMapType().valueType()));
+    }
+    throw new RuntimeException("Unrecognized IcebergIO Type");
+  }
+
+  public static Schema.Field icebergFieldToBeamField(final Types.NestedField 
field) {
+    return Schema.Field.of(field.name(), 
icebergTypeToBeamFieldType(field.type()))
+        .withOptions(
+            Schema.Options.builder()
+                .setOption(
+                    ICEBERG_TYPE_OPTION_NAME, Schema.FieldType.STRING, 
field.type().typeId().name())
+                .build())
+        .withNullable(field.isOptional());
+  }
+
+  public static Schema icebergSchemaToBeamSchema(final 
org.apache.iceberg.Schema schema) {
+    Schema.Builder builder = Schema.builder();
+    for (Types.NestedField f : schema.columns()) {
+      builder.addField(icebergFieldToBeamField(f));
+    }
+    return builder.build();
+  }
+
+  public static Schema icebergStructTypeToBeamSchema(final Types.StructType 
struct) {
+    Schema.Builder builder = Schema.builder();
+    for (Types.NestedField f : struct.fields()) {
+      builder.addField(icebergFieldToBeamField(f));
+    }
+    return builder.build();
+  }
+
+  public static Types.NestedField beamFieldToIcebergField(int fieldId, final 
Schema.Field field) {
+    String typeId = field.getOptions().getValue(ICEBERG_TYPE_OPTION_NAME, 
String.class);
+    if (typeId != null) {
+      return Types.NestedField.of(
+          fieldId,
+          field.getType().getNullable(),
+          field.getName(),
+          Types.fromPrimitiveString(typeId));
+    } else {
+      return Types.NestedField.of(
+          fieldId, field.getType().getNullable(), field.getName(), 
Types.StringType.get());
+    }
+  }
+
+  public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final 
Schema schema) {
+    Types.NestedField[] fields = new Types.NestedField[schema.getFieldCount()];
+    int fieldId = 0;
+    for (Schema.Field f : schema.getFields()) {
+      fields[fieldId++] = beamFieldToIcebergField(fieldId, f);
+    }
+    return new org.apache.iceberg.Schema(fields);
+  }
+
+  public static Record rowToRecord(org.apache.iceberg.Schema schema, Row row) {
+    return copyRowIntoRecord(GenericRecord.create(schema), row);
+  }
+
+  private static Record copyRowIntoRecord(Record baseRecord, Row value) {
+    Record rec = baseRecord.copy();
+    for (Types.NestedField f : rec.struct().fields()) {
+      copyFieldIntoRecord(rec, f, value);
+    }
+    return rec;
+  }
+
+  private static void copyFieldIntoRecord(Record rec, Types.NestedField field, 
Row value) {
+    String name = field.name();
+    switch (field.type().typeId()) {
+      case BOOLEAN:
+        Optional.ofNullable(value.getBoolean(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case INTEGER:
+        Optional.ofNullable(value.getInt32(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case LONG:
+        Optional.ofNullable(value.getInt64(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case FLOAT:
+        Optional.ofNullable(value.getFloat(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case DOUBLE:
+        Optional.ofNullable(value.getDouble(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case DATE:
+        throw new UnsupportedOperationException("Date fields not yet 
supported");
+      case TIME:
+        throw new UnsupportedOperationException("Time fields not yet 
supported");
+      case TIMESTAMP:
+        Optional.ofNullable(value.getDateTime(name))
+            .ifPresent(v -> rec.setField(name, v.getMillis()));
+        break;
+      case STRING:
+        Optional.ofNullable(value.getString(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case UUID:
+        Optional.ofNullable(value.getBytes(name))
+            .ifPresent(v -> rec.setField(name, UUID.nameUUIDFromBytes(v)));
+        break;
+      case FIXED:
+        throw new UnsupportedOperationException("Fixed-precision fields are 
not yet supported.");
+      case BINARY:
+        Optional.ofNullable(value.getBytes(name))
+            .ifPresent(v -> rec.setField(name, ByteBuffer.wrap(v)));
+        break;
+      case DECIMAL:
+        Optional.ofNullable(value.getDecimal(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case STRUCT:
+        Optional.ofNullable(value.getRow(name))
+            .ifPresent(
+                row ->
+                    rec.setField(
+                        name,
+                        
copyRowIntoRecord(GenericRecord.create(field.type().asStructType()), row)));
+        break;
+      case LIST:
+        throw new UnsupportedOperationException("List fields are not yet 
supported.");
+      case MAP:
+        throw new UnsupportedOperationException("Map fields are not yet 
supported.");
+    }
+  }
+
+  public static Row recordToRow(Schema schema, Record record) {
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    for (Schema.Field field : schema.getFields()) {
+      switch (field.getType().getTypeName()) {
+        case BYTE:
+          // I guess allow anything we can cast here

Review Comment:
   done (I can refactor tests to share pairs of inputs later, but anyhow I 
added the reverse tests)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to