ahmedabu98 commented on code in PR #31086:
URL: https://github.com/apache/beam/pull/31086#discussion_r1576827187


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java:
##########
@@ -17,197 +17,69 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.avro.Schema;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderProviders;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.avro.AvroEncoderUtil;
-import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.types.Types;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
 @AutoValue
-@DefaultCoder(FileWriteResult.FileWriteResultCoder.class)
+@DefaultSchema(AutoValueSchema.class)
 abstract class FileWriteResult {
-  public abstract TableIdentifier getTableIdentifier();
 
-  public abstract PartitionSpec getPartitionSpec();
+  private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier;
+  private transient @MonotonicNonNull ManifestFile cachedManifestFile;
 
-  public abstract DataFile getDataFile();
+  abstract String getTableIdentifierString();
 
-  public static Builder builder() {
-    return new AutoValue_FileWriteResult.Builder();
-  }
+  @SuppressWarnings("mutable")
+  abstract byte[] getManifestFileBytes();
 
-  @AutoValue.Builder
-  abstract static class Builder {
-    public abstract Builder setTableIdentifier(TableIdentifier tableId);
-
-    public abstract Builder setPartitionSpec(PartitionSpec partitionSpec);
-
-    public abstract Builder setDataFile(DataFile dataFiles);
-
-    public abstract FileWriteResult build();
-  }
-
-  public static class FileWriteResultCoder extends 
StructuredCoder<FileWriteResult> {
-    static final int VERSION = 0;
-    private static final FileWriteResultCoder SINGLETON = new 
FileWriteResultCoder();
-
-    private static final Coder<String> tableIdentifierCoder = 
StringUtf8Coder.of();
-    private static final Coder<PartitionSpec> partitionSpecCoder =
-        SerializableCoder.of(PartitionSpec.class);
-    private static final Coder<byte[]> dataFileBytesCoder = 
ByteArrayCoder.of();
-
-    private static Schema getDataFileAvroSchema(FileWriteResult 
fileWriteResult) {
-      Types.StructType partitionType = 
fileWriteResult.getPartitionSpec().partitionType();
-      Types.StructType dataFileStruct = DataFile.getType(partitionType);
-      Map<Types.StructType, String> dataFileNames =
-          ImmutableMap.of(
-              dataFileStruct, "org.apache.iceberg.GenericDataFile",
-              partitionType, "org.apache.iceberg.PartitionData");
-      return AvroSchemaUtil.convert(dataFileStruct, dataFileNames);
-    }
-
-    @Override
-    public void encode(FileWriteResult value, OutputStream outStream)
-        throws CoderException, IOException {
-      // "version" of this coder.
-      // If breaking changes are introduced (e.g. from Beam, Iceberg, Avro, 
etc..),
-      // then update this version and create a fork in decode() below for the 
new decode logic.
-      // This helps keep the pipeline update-compatible
-      outStream.write(VERSION);
-
-      tableIdentifierCoder.encode(value.getTableIdentifier().toString(), 
outStream);
-      partitionSpecCoder.encode(value.getPartitionSpec(), outStream);
-      dataFileBytesCoder.encode(
-          AvroEncoderUtil.encode(value.getDataFile(), 
getDataFileAvroSchema(value)), outStream);
-    }
-
-    @Override
-    public FileWriteResult decode(InputStream inStream) throws CoderException, 
IOException {
-      // Forking logic can be added here depending on the version of this coder
-      assert inStream.read() == 0;
-
-      TableIdentifier tableId = 
TableIdentifier.parse(tableIdentifierCoder.decode(inStream));
-      PartitionSpec partitionSpec = partitionSpecCoder.decode(inStream);
-      DataFile dataFile =
-          checkArgumentNotNull(
-              AvroEncoderUtil.decode(dataFileBytesCoder.decode(inStream)),
-              "Decoding of dataFile resulted in null");
-      return FileWriteResult.builder()
-          .setTableIdentifier(tableId)
-          .setDataFile(dataFile)
-          .setPartitionSpec(partitionSpec)
-          .build();
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return Collections.emptyList();
-    }
-
-    @Override
-    public Object structuralValue(FileWriteResult fileWriteResult) {
-      return new FileWriteResultDeepEqualityWrapper(fileWriteResult);
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {}
-
-    @Override
-    public TypeDescriptor<FileWriteResult> getEncodedTypeDescriptor() {
-      return TypeDescriptor.of(FileWriteResult.class);
+  @SchemaIgnore
+  public TableIdentifier getTableIdentifier() {
+    if (cachedTableIdentifier == null) {
+      cachedTableIdentifier = 
TableIdentifier.parse(getTableIdentifierString());
     }
+    return cachedTableIdentifier;
+  }
 
-    public static FileWriteResultCoder of() {
-      return SINGLETON;
+  @SchemaIgnore
+  public ManifestFile getManifestFile() {
+    if (cachedManifestFile == null) {
+      try {
+        cachedManifestFile = ManifestFiles.decode(getManifestFileBytes());
+      } catch (IOException exc) {
+        throw new RuntimeException("Error decoding manifest file bytes");
+      }

Review Comment:
   Can we include the table identifier in this error message for debuggability? 
or is it PII?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to