kennknowles commented on code in PR #30797:
URL: https://github.com/apache/beam/pull/30797#discussion_r1552651757


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/IcebergIO.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+public class IcebergIO {
+
+  public static WriteRows writeToDynamicDestinations(

Review Comment:
   We could do that. I was thinking that we might make convenience methods 
later like `writeToTable(catalog, table_id)` so I made this name extra long. We 
can always add others. I tend to prefer different method names rather than 
overloading.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+class AppendFilesToTables
+    extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, 
Snapshot>>> {
+
+  private final IcebergCatalogConfig catalogConfig;
+
+  AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
+    this.catalogConfig = catalogConfig;
+  }
+
+  @Override
+  public PCollection<KV<String, Snapshot>> expand(PCollection<FileWriteResult> 
writtenFiles) {
+
+    // Apply any sharded writes and flatten everything for catalog updates
+    return writtenFiles
+        .apply(
+            "Key metadata updates by table",
+            WithKeys.of(
+                new SerializableFunction<FileWriteResult, String>() {
+                  @Override
+                  public String apply(FileWriteResult input) {
+                    return input.getTableIdentifier().toString();
+                  }
+                }))
+        // .setCoder(KvCoder.of(StringUtf8Coder.of(), new 
MetadataUpdate.MetadataUpdateCoder()))

Review Comment:
   Done



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/FileWriteResult.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderProvider;
+import org.apache.beam.sdk.coders.CoderProviders;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.avro.AvroEncoderUtil;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+
+@AutoValue
+@DefaultCoder(FileWriteResult.FileWriteResultCoder.class)
+abstract class FileWriteResult {
+  public abstract TableIdentifier getTableIdentifier();
+
+  public abstract PartitionSpec getPartitionSpec();
+
+  public abstract DataFile getDataFile();
+
+  public static Builder builder() {
+    return new AutoValue_FileWriteResult.Builder();
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    public abstract Builder setTableIdentifier(TableIdentifier tableId);
+
+    public abstract Builder setPartitionSpec(PartitionSpec partitionSpec);
+
+    public abstract Builder setDataFile(DataFile dataFiles);
+
+    public abstract FileWriteResult build();
+  }
+
+  public static class FileWriteResultCoder extends 
StructuredCoder<FileWriteResult> {

Review Comment:
   Done, somewhat. Could use some data generators to thoroughly test.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/OneTableDynamicDestinations.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+class OneTableDynamicDestinations implements DynamicDestinations {
+
+  private static final Schema EMPTY_SCHEMA = Schema.builder().build();
+  private static final Row EMPTY_ROW = Row.nullRow(EMPTY_SCHEMA);
+
+  // TableId represented as String for serializability
+  private final String tableIdString;
+
+  private transient @MonotonicNonNull TableIdentifier tableId;
+
+  private TableIdentifier getTableIdentifier() {
+    if (tableId == null) {
+      tableId = TableIdentifier.parse(tableIdString);
+    }
+    return tableId;
+  }
+
+  OneTableDynamicDestinations(TableIdentifier tableId) {
+    this.tableIdString = tableId.toString();
+  }
+
+  @Override
+  public Schema getMetadataSchema() {
+    return EMPTY_SCHEMA;
+  }
+
+  @Override
+  public Row assignDestinationMetadata(Row data) {
+    return EMPTY_ROW;
+  }
+
+  @Override
+  public IcebergDestination instantiateDestination(Row dest) {
+    return IcebergDestination.builder()
+        .setTableIdentifier(getTableIdentifier())
+        .setTableCreateConfig(null)
+        .setFileFormat(FileFormat.PARQUET)

Review Comment:
   It should be configurable. In testing, I have discovered that the ORC 
codepath doesn't work so I've changed it to throw.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/SchemaHelper.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+@SuppressWarnings({"dereference.of.nullable"})
+class SchemaHelper {
+
+  private SchemaHelper() {}
+
+  public static final String ICEBERG_TYPE_OPTION_NAME = "icebergTypeID";
+
+  public static Schema.FieldType fieldTypeForType(final Type type) {
+    switch (type.typeId()) {
+      case BOOLEAN:
+        return FieldType.BOOLEAN;
+      case INTEGER:
+        return FieldType.INT32;
+      case LONG:
+        return FieldType.INT64;
+      case FLOAT:
+        return FieldType.FLOAT;
+      case DOUBLE:
+        return FieldType.DOUBLE;
+      case DATE:
+      case TIME:
+      case TIMESTAMP: // TODO: Logical types?
+        return FieldType.DATETIME;
+      case STRING:
+        return FieldType.STRING;
+      case UUID:

Review Comment:
   Yea it is a Java `UUID` which contains a `byte[]`.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/RowHelper.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import java.util.Optional;
+import org.apache.beam.sdk.values.Row;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types.NestedField;
+
+class RowHelper {
+
+  // static helper functions only
+  private RowHelper() {}
+
+  public static Record rowToRecord(org.apache.iceberg.Schema schema, Row row) {
+    return copy(GenericRecord.create(schema), row);
+  }
+
+  private static Record copy(Record baseRecord, Row value) {
+    Record rec = baseRecord.copy();
+    for (NestedField f : rec.struct().fields()) {
+      copyInto(rec, f, value);
+    }
+    return rec;
+  }
+
+  private static void copyInto(Record rec, NestedField field, Row value) {
+    String name = field.name();
+    switch (field.type().typeId()) {
+      case BOOLEAN:
+        Optional.ofNullable(value.getBoolean(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case INTEGER:
+        Optional.ofNullable(value.getInt32(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case LONG:
+        Optional.ofNullable(value.getInt64(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case FLOAT:
+        Optional.ofNullable(value.getFloat(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case DOUBLE:
+        Optional.ofNullable(value.getDouble(name)).ifPresent(v -> 
rec.setField(name, v));
+        break;
+      case DATE:

Review Comment:
   omg yes. haha I didn't notice this. Fixed - added some more support and 
testing for some types, and throw for the other ones that are not yet 
supported. We will want to fast-follow with support, but some of the date 
semantics are unclear to me. (like an iceberg DATE is stored as a Long but I'm 
not sure exactly what it represents)



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/io/iceberg/AppendFilesToTables.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.iceberg;
+
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+
+class AppendFilesToTables
+    extends PTransform<PCollection<FileWriteResult>, PCollection<KV<String, 
Snapshot>>> {
+
+  private final IcebergCatalogConfig catalogConfig;
+
+  AppendFilesToTables(IcebergCatalogConfig catalogConfig) {
+    this.catalogConfig = catalogConfig;
+  }
+
+  @Override
+  public PCollection<KV<String, Snapshot>> expand(PCollection<FileWriteResult> 
writtenFiles) {
+
+    // Apply any sharded writes and flatten everything for catalog updates
+    return writtenFiles
+        .apply(
+            "Key metadata updates by table",
+            WithKeys.of(
+                new SerializableFunction<FileWriteResult, String>() {
+                  @Override
+                  public String apply(FileWriteResult input) {
+                    return input.getTableIdentifier().toString();
+                  }
+                }))
+        // .setCoder(KvCoder.of(StringUtf8Coder.of(), new 
MetadataUpdate.MetadataUpdateCoder()))
+        .apply("Group metadata updates by table", GroupByKey.create())
+        .apply(
+            "Append metadata updates to tables",
+            ParDo.of(new AppendFilesToTablesDoFn(catalogConfig)))
+        .setCoder(KvCoder.of(StringUtf8Coder.of(), 
SerializableCoder.of(Snapshot.class)));
+  }
+
+  private static class AppendFilesToTablesDoFn
+      extends DoFn<KV<String, Iterable<FileWriteResult>>, KV<String, 
Snapshot>> {
+
+    private final IcebergCatalogConfig catalogConfig;
+
+    private transient @MonotonicNonNull Catalog catalog;
+
+    private AppendFilesToTablesDoFn(IcebergCatalogConfig catalogConfig) {
+      this.catalogConfig = catalogConfig;
+    }
+
+    private Catalog getCatalog() {
+      if (catalog == null) {
+        catalog = catalogConfig.catalog();
+      }
+      return catalog;
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element KV<String, Iterable<FileWriteResult>> element,
+        OutputReceiver<KV<String, Snapshot>> out,
+        BoundedWindow window) {
+      Table table = 
getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
+      AppendFiles update = table.newAppend();
+      for (FileWriteResult writtenFile : element.getValue()) {
+        update.appendFile(writtenFile.getDataFile());
+      }
+      update.commit();

Review Comment:
   All the files per destination are grouped into a single atomic commit. There 
are two things that could go wrong:
   
   1. Failure after the commit but before downstream processing, so a new 
transaction will try to append the same files. I verified that this is 
idempotent (and I included it as a unit test just to clarify).
   2. Some tables successfully commit but then there are enough failures that 
the pipeline itself fails. We probably _can_ do a multi-table transaction. We 
would write the various files all to a manifest and then merge to a single 
thread and commit all the manifests at once. We don't do this for other sinks, 
do we?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to