kennknowles commented on code in PR #31086:
URL: https://github.com/apache/beam/pull/31086#discussion_r1577015044
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java:
##########
@@ -17,197 +17,69 @@
*/
package org.apache.beam.sdk.io.iceberg;
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-
import com.google.auto.value.AutoValue;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.avro.Schema;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderProviders;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.avro.AvroEncoderUtil;
-import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.types.Types;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@AutoValue
-@DefaultCoder(FileWriteResult.FileWriteResultCoder.class)
+@DefaultSchema(AutoValueSchema.class)
abstract class FileWriteResult {
- public abstract TableIdentifier getTableIdentifier();
- public abstract PartitionSpec getPartitionSpec();
+ private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier;
+ private transient @MonotonicNonNull ManifestFile cachedManifestFile;
- public abstract DataFile getDataFile();
+ abstract String getTableIdentifierString();
- public static Builder builder() {
- return new AutoValue_FileWriteResult.Builder();
- }
+ @SuppressWarnings("mutable")
+ abstract byte[] getManifestFileBytes();
- @AutoValue.Builder
- abstract static class Builder {
- public abstract Builder setTableIdentifier(TableIdentifier tableId);
-
- public abstract Builder setPartitionSpec(PartitionSpec partitionSpec);
-
- public abstract Builder setDataFile(DataFile dataFiles);
-
- public abstract FileWriteResult build();
- }
-
- public static class FileWriteResultCoder extends
StructuredCoder<FileWriteResult> {
- static final int VERSION = 0;
- private static final FileWriteResultCoder SINGLETON = new
FileWriteResultCoder();
-
- private static final Coder<String> tableIdentifierCoder =
StringUtf8Coder.of();
- private static final Coder<PartitionSpec> partitionSpecCoder =
- SerializableCoder.of(PartitionSpec.class);
- private static final Coder<byte[]> dataFileBytesCoder =
ByteArrayCoder.of();
-
- private static Schema getDataFileAvroSchema(FileWriteResult
fileWriteResult) {
- Types.StructType partitionType =
fileWriteResult.getPartitionSpec().partitionType();
- Types.StructType dataFileStruct = DataFile.getType(partitionType);
- Map<Types.StructType, String> dataFileNames =
- ImmutableMap.of(
- dataFileStruct, "org.apache.iceberg.GenericDataFile",
- partitionType, "org.apache.iceberg.PartitionData");
- return AvroSchemaUtil.convert(dataFileStruct, dataFileNames);
- }
-
- @Override
- public void encode(FileWriteResult value, OutputStream outStream)
- throws CoderException, IOException {
- // "version" of this coder.
- // If breaking changes are introduced (e.g. from Beam, Iceberg, Avro,
etc..),
- // then update this version and create a fork in decode() below for the
new decode logic.
- // This helps keep the pipeline update-compatible
- outStream.write(VERSION);
-
- tableIdentifierCoder.encode(value.getTableIdentifier().toString(),
outStream);
- partitionSpecCoder.encode(value.getPartitionSpec(), outStream);
- dataFileBytesCoder.encode(
- AvroEncoderUtil.encode(value.getDataFile(),
getDataFileAvroSchema(value)), outStream);
- }
-
- @Override
- public FileWriteResult decode(InputStream inStream) throws CoderException,
IOException {
- // Forking logic can be added here depending on the version of this coder
- assert inStream.read() == 0;
-
- TableIdentifier tableId =
TableIdentifier.parse(tableIdentifierCoder.decode(inStream));
- PartitionSpec partitionSpec = partitionSpecCoder.decode(inStream);
- DataFile dataFile =
- checkArgumentNotNull(
- AvroEncoderUtil.decode(dataFileBytesCoder.decode(inStream)),
- "Decoding of dataFile resulted in null");
- return FileWriteResult.builder()
- .setTableIdentifier(tableId)
- .setDataFile(dataFile)
- .setPartitionSpec(partitionSpec)
- .build();
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Collections.emptyList();
- }
-
- @Override
- public Object structuralValue(FileWriteResult fileWriteResult) {
- return new FileWriteResultDeepEqualityWrapper(fileWriteResult);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {}
-
- @Override
- public TypeDescriptor<FileWriteResult> getEncodedTypeDescriptor() {
- return TypeDescriptor.of(FileWriteResult.class);
+ @SchemaIgnore
+ public TableIdentifier getTableIdentifier() {
+ if (cachedTableIdentifier == null) {
+ cachedTableIdentifier =
TableIdentifier.parse(getTableIdentifierString());
}
+ return cachedTableIdentifier;
+ }
- public static FileWriteResultCoder of() {
- return SINGLETON;
+ @SchemaIgnore
+ public ManifestFile getManifestFile() {
+ if (cachedManifestFile == null) {
+ try {
+ cachedManifestFile = ManifestFiles.decode(getManifestFileBytes());
+ } catch (IOException exc) {
+ throw new RuntimeException("Error decoding manifest file bytes");
+ }
Review Comment:
We might be able to include the manifest file name. I think with the LGTM we
have it would be worthwhile to merge so it is safe to release and then improve
error message if we have time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]