This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch release-2.56.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.56.0 by this push:
new d07cc622c80 Simplify intermediate data in Iceberg sink; use manifest
files (#31090)
d07cc622c80 is described below
commit d07cc622c807d1078437ddbd931feefe06dfb52c
Author: Kenneth Knowles <[email protected]>
AuthorDate: Fri Apr 26 06:55:58 2024 -0400
Simplify intermediate data in Iceberg sink; use manifest files (#31090)
---
sdks/java/io/iceberg/build.gradle | 1 -
.../beam/sdk/io/iceberg/AppendFilesToTables.java | 2 +-
.../beam/sdk/io/iceberg/FileWriteResult.java | 210 ++++-----------------
.../apache/beam/sdk/io/iceberg/RecordWriter.java | 20 +-
.../sdk/io/iceberg/WriteGroupedRowsToFiles.java | 3 +-
.../sdk/io/iceberg/WriteUngroupedRowsToFiles.java | 5 +-
.../beam/sdk/io/iceberg/FileWriteResultTest.java | 166 ----------------
7 files changed, 61 insertions(+), 346 deletions(-)
diff --git a/sdks/java/io/iceberg/build.gradle
b/sdks/java/io/iceberg/build.gradle
index e721b98f102..f82284e3b39 100644
--- a/sdks/java/io/iceberg/build.gradle
+++ b/sdks/java/io/iceberg/build.gradle
@@ -52,7 +52,6 @@ dependencies {
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
- implementation library.java.avro
implementation library.java.hadoop_common
testImplementation library.java.hadoop_client
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
index e4ba6000182..bb42df5a933 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
@@ -90,7 +90,7 @@ class AppendFilesToTables
Table table =
getCatalog().loadTable(TableIdentifier.parse(element.getKey()));
AppendFiles update = table.newAppend();
for (FileWriteResult writtenFile : element.getValue()) {
- update.appendFile(writtenFile.getDataFile());
+ update.appendManifest(writtenFile.getManifestFile());
}
update.commit();
out.outputWithTimestamp(
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
index c12febc03f4..2459c0befde 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/FileWriteResult.java
@@ -17,197 +17,69 @@
*/
package org.apache.beam.sdk.io.iceberg;
-import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-
import com.google.auto.value.AutoValue;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.avro.Schema;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.CoderProvider;
-import org.apache.beam.sdk.coders.CoderProviders;
-import org.apache.beam.sdk.coders.DefaultCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.avro.AvroEncoderUtil;
-import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.types.Types;
-import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@AutoValue
-@DefaultCoder(FileWriteResult.FileWriteResultCoder.class)
+@DefaultSchema(AutoValueSchema.class)
abstract class FileWriteResult {
- public abstract TableIdentifier getTableIdentifier();
- public abstract PartitionSpec getPartitionSpec();
+ private transient @MonotonicNonNull TableIdentifier cachedTableIdentifier;
+ private transient @MonotonicNonNull ManifestFile cachedManifestFile;
- public abstract DataFile getDataFile();
+ abstract String getTableIdentifierString();
- public static Builder builder() {
- return new AutoValue_FileWriteResult.Builder();
- }
+ @SuppressWarnings("mutable")
+ abstract byte[] getManifestFileBytes();
- @AutoValue.Builder
- abstract static class Builder {
- public abstract Builder setTableIdentifier(TableIdentifier tableId);
-
- public abstract Builder setPartitionSpec(PartitionSpec partitionSpec);
-
- public abstract Builder setDataFile(DataFile dataFiles);
-
- public abstract FileWriteResult build();
- }
-
- public static class FileWriteResultCoder extends
StructuredCoder<FileWriteResult> {
- static final int VERSION = 0;
- private static final FileWriteResultCoder SINGLETON = new
FileWriteResultCoder();
-
- private static final Coder<String> tableIdentifierCoder =
StringUtf8Coder.of();
- private static final Coder<PartitionSpec> partitionSpecCoder =
- SerializableCoder.of(PartitionSpec.class);
- private static final Coder<byte[]> dataFileBytesCoder =
ByteArrayCoder.of();
-
- private static Schema getDataFileAvroSchema(FileWriteResult
fileWriteResult) {
- Types.StructType partitionType =
fileWriteResult.getPartitionSpec().partitionType();
- Types.StructType dataFileStruct = DataFile.getType(partitionType);
- Map<Types.StructType, String> dataFileNames =
- ImmutableMap.of(
- dataFileStruct, "org.apache.iceberg.GenericDataFile",
- partitionType, "org.apache.iceberg.PartitionData");
- return AvroSchemaUtil.convert(dataFileStruct, dataFileNames);
- }
-
- @Override
- public void encode(FileWriteResult value, OutputStream outStream)
- throws CoderException, IOException {
- // "version" of this coder.
- // If breaking changes are introduced (e.g. from Beam, Iceberg, Avro,
etc..),
- // then update this version and create a fork in decode() below for the
new decode logic.
- // This helps keep the pipeline update-compatible
- outStream.write(VERSION);
-
- tableIdentifierCoder.encode(value.getTableIdentifier().toString(),
outStream);
- partitionSpecCoder.encode(value.getPartitionSpec(), outStream);
- dataFileBytesCoder.encode(
- AvroEncoderUtil.encode(value.getDataFile(),
getDataFileAvroSchema(value)), outStream);
- }
-
- @Override
- public FileWriteResult decode(InputStream inStream) throws CoderException,
IOException {
- // Forking logic can be added here depending on the version of this coder
- assert inStream.read() == 0;
-
- TableIdentifier tableId =
TableIdentifier.parse(tableIdentifierCoder.decode(inStream));
- PartitionSpec partitionSpec = partitionSpecCoder.decode(inStream);
- DataFile dataFile =
- checkArgumentNotNull(
- AvroEncoderUtil.decode(dataFileBytesCoder.decode(inStream)),
- "Decoding of dataFile resulted in null");
- return FileWriteResult.builder()
- .setTableIdentifier(tableId)
- .setDataFile(dataFile)
- .setPartitionSpec(partitionSpec)
- .build();
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Collections.emptyList();
- }
-
- @Override
- public Object structuralValue(FileWriteResult fileWriteResult) {
- return new FileWriteResultDeepEqualityWrapper(fileWriteResult);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {}
-
- @Override
- public TypeDescriptor<FileWriteResult> getEncodedTypeDescriptor() {
- return TypeDescriptor.of(FileWriteResult.class);
+ @SchemaIgnore
+ public TableIdentifier getTableIdentifier() {
+ if (cachedTableIdentifier == null) {
+ cachedTableIdentifier =
TableIdentifier.parse(getTableIdentifierString());
}
+ return cachedTableIdentifier;
+ }
- public static FileWriteResultCoder of() {
- return SINGLETON;
+ @SchemaIgnore
+ public ManifestFile getManifestFile() {
+ if (cachedManifestFile == null) {
+ try {
+ cachedManifestFile = ManifestFiles.decode(getManifestFileBytes());
+ } catch (IOException exc) {
+ throw new RuntimeException("Error decoding manifest file bytes");
+ }
}
+ return cachedManifestFile;
+ }
- @SuppressWarnings("unused") // used via `DefaultCoder` annotation
- public static CoderProvider getCoderProvider() {
- return CoderProviders.forCoder(
- TypeDescriptor.of(FileWriteResult.class), FileWriteResultCoder.of());
- }
+ public static Builder builder() {
+ return new AutoValue_FileWriteResult.Builder();
}
- private static class FileWriteResultDeepEqualityWrapper {
- private final FileWriteResult fileWriteResult;
+ @AutoValue.Builder
+ abstract static class Builder {
- private FileWriteResultDeepEqualityWrapper(FileWriteResult
fileWriteResult) {
- this.fileWriteResult = fileWriteResult;
- }
+ abstract Builder setTableIdentifierString(String tableIdString);
- @Override
- public boolean equals(@Nullable Object obj) {
- if (obj == this) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (!(obj instanceof FileWriteResultDeepEqualityWrapper)) {
- return false;
- }
- FileWriteResultDeepEqualityWrapper other =
(FileWriteResultDeepEqualityWrapper) obj;
+ abstract Builder setManifestFileBytes(byte[] manifestFileBytes);
- return Objects.equals(
- fileWriteResult.getTableIdentifier(),
other.fileWriteResult.getTableIdentifier())
- && Objects.equals(
- fileWriteResult.getPartitionSpec(),
other.fileWriteResult.getPartitionSpec())
- && dataFilesEqual(fileWriteResult.getDataFile(),
other.fileWriteResult.getDataFile());
+ @SchemaIgnore
+ public Builder setTableIdentifier(TableIdentifier tableId) {
+ return setTableIdentifierString(tableId.toString());
}
- private boolean dataFilesEqual(DataFile first, DataFile second) {
- return Objects.equals(first.pos(), second.pos())
- && first.specId() == second.specId()
- && Objects.equals(first.content(), second.content())
- && Objects.equals(first.path(), second.path())
- && Objects.equals(first.format(), second.format())
- && Objects.equals(first.partition(), second.partition())
- && first.recordCount() == second.recordCount()
- && first.fileSizeInBytes() == second.fileSizeInBytes()
- && Objects.equals(first.columnSizes(), second.columnSizes())
- && Objects.equals(first.valueCounts(), second.valueCounts())
- && Objects.equals(first.nullValueCounts(), second.nullValueCounts())
- && Objects.equals(first.nanValueCounts(), second.nanValueCounts())
- && Objects.equals(first.lowerBounds(), second.lowerBounds())
- && Objects.equals(first.upperBounds(), second.upperBounds())
- && Objects.equals(first.keyMetadata(), second.keyMetadata())
- && Objects.equals(first.splitOffsets(), second.splitOffsets())
- && Objects.equals(first.equalityFieldIds(),
second.equalityFieldIds())
- && Objects.equals(first.sortOrderId(), second.sortOrderId())
- && Objects.equals(first.dataSequenceNumber(),
second.dataSequenceNumber())
- && Objects.equals(first.fileSequenceNumber(),
second.fileSequenceNumber());
+ @SchemaIgnore
+ public Builder setManifestFile(ManifestFile manifestFile) throws
IOException {
+ return setManifestFileBytes(ManifestFiles.encode(manifestFile));
}
- @Override
- public int hashCode() {
- return Objects.hash(
- fileWriteResult.getTableIdentifier(),
- fileWriteResult.getPartitionSpec(),
- fileWriteResult.getDataFile());
- }
+ public abstract FileWriteResult build();
}
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
index aa203eb6eb6..859310bdcec 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
@@ -23,6 +23,9 @@ import java.io.IOException;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.catalog.Catalog;
@@ -37,6 +40,7 @@ class RecordWriter {
private final DataWriter<Record> icebergDataWriter;
private final Table table;
+ private final String absoluteFilename;
RecordWriter(Catalog catalog, IcebergDestination destination, String
filename)
throws IOException {
@@ -46,9 +50,9 @@ class RecordWriter {
RecordWriter(Table table, FileFormat fileFormat, String filename) throws
IOException {
this.table = table;
-
- String absoluteFilename = table.location() + "/" + filename;
+ this.absoluteFilename = table.location() + "/" + filename;
OutputFile outputFile = table.io().newOutputFile(absoluteFilename);
+
switch (fileFormat) {
case AVRO:
icebergDataWriter =
@@ -92,7 +96,15 @@ class RecordWriter {
return icebergDataWriter.length();
}
- public DataFile dataFile() {
- return icebergDataWriter.toDataFile();
+ public ManifestFile getManifestFile() throws IOException {
+ String manifestFilename = FileFormat.AVRO.addExtension(absoluteFilename +
".manifest");
+ OutputFile outputFile = table.io().newOutputFile(manifestFilename);
+ ManifestWriter<DataFile> manifestWriter;
+ try (ManifestWriter<DataFile> openWriter =
ManifestFiles.write(getTable().spec(), outputFile)) {
+ openWriter.add(icebergDataWriter.toDataFile());
+ manifestWriter = openWriter;
+ }
+
+ return manifestWriter.toManifestFile();
}
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
index 731a9fefb49..c1126351944 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java
@@ -95,8 +95,7 @@ class WriteGroupedRowsToFiles
c.output(
FileWriteResult.builder()
.setTableIdentifier(destination.getTableIdentifier())
- .setDataFile(writer.dataFile())
- .setPartitionSpec(writer.getTable().spec())
+ .setManifestFile(writer.getManifestFile())
.build());
}
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
index 917aab9e55c..a00f3de4bb4 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java
@@ -267,7 +267,7 @@ class WriteUngroupedRowsToFiles
out.get(WRITTEN_FILES_TAG)
.output(
FileWriteResult.builder()
- .setDataFile(writer.dataFile())
+ .setManifestFile(writer.getManifestFile())
.setTableIdentifier(destination.getTableIdentifier())
.build());
writer = createAndInsertWriter(destination, window);
@@ -307,9 +307,8 @@ class WriteUngroupedRowsToFiles
getWindows().get(destination), "internal error: no windows
for destination");
c.output(
FileWriteResult.builder()
- .setDataFile(writer.dataFile())
+ .setManifestFile(writer.getManifestFile())
.setTableIdentifier(destination.getTableIdentifier())
- .setPartitionSpec(writer.getTable().spec())
.build(),
window.maxTimestamp(),
window);
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FileWriteResultTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FileWriteResultTest.java
deleted file mode 100644
index 64413059315..00000000000
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FileWriteResultTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.iceberg;
-
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.apache.iceberg.types.Types.NestedField.required;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.UUID;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.commons.compress.utils.Lists;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.types.Types;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class FileWriteResultTest implements Serializable {
-
- @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
-
- @Rule public TestDataWarehouse warehouse = new
TestDataWarehouse(TEMPORARY_FOLDER, "default");
-
- private static final Coder<FileWriteResult> TEST_CODER =
- FileWriteResult.FileWriteResultCoder.of();
-
- private List<FileWriteResult> getTestValues() throws Exception {
- TableIdentifier tableId =
- TableIdentifier.of("default", "table" +
Long.toString(UUID.randomUUID().hashCode(), 16));
-
- // Create a table so we can have some DataFile objects
- Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
- List<FileWriteResult> values = Lists.newArrayList();
-
- // A parquet file
- RecordWriter writer =
- new RecordWriter(table, FileFormat.PARQUET,
TEMPORARY_FOLDER.newFile().toString());
- writer.write(
-
Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
- .addValues(42L, "bizzle")
- .build());
- writer.close();
- DataFile dataFile = writer.dataFile();
- values.add(
- FileWriteResult.builder()
- .setDataFile(dataFile)
- .setPartitionSpec(table.spec())
- .setTableIdentifier(tableId)
- .build());
-
- // An avro file
- writer = new RecordWriter(table, FileFormat.AVRO,
TEMPORARY_FOLDER.newFile().toString());
- writer.write(
-
Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
- .addValues(42L, "bizzle")
- .build());
- writer.close();
- dataFile = writer.dataFile();
- values.add(
- FileWriteResult.builder()
- .setDataFile(dataFile)
- .setPartitionSpec(table.spec())
- .setTableIdentifier(tableId)
- .build());
-
- // Parquet file with a different schema
- TableIdentifier tableId2 =
- TableIdentifier.of(
- "default", "othertable" +
Long.toString(UUID.randomUUID().hashCode(), 16));
- Schema schema =
- new Schema(
- required(1, "id", Types.LongType.get()),
- optional(2, "data", Types.StringType.get()),
- optional(
- 3,
- "extra",
- Types.StructType.of(
- Types.NestedField.required(4, "inner",
Types.BinaryType.get()))));
- Table table2 = warehouse.createTable(tableId2, schema);
-
- // A parquet file in this other table
- writer = new RecordWriter(table2, FileFormat.PARQUET,
TEMPORARY_FOLDER.newFile().toString());
- writer.write(
-
Row.withSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(schema))
- .addValues(
- 42L,
- "bizzle",
- Row.withSchema(
- org.apache.beam.sdk.schemas.Schema.of(
- org.apache.beam.sdk.schemas.Schema.Field.of(
- "inner",
org.apache.beam.sdk.schemas.Schema.FieldType.BYTES)))
- .addValues(new byte[] {0xa})
- .build())
- .build());
- writer.close();
- DataFile dataFile2 = writer.dataFile();
- values.add(
- FileWriteResult.builder()
- .setDataFile(dataFile2)
- .setPartitionSpec(table2.spec())
- .setTableIdentifier(tableId2)
- .build());
-
- return values;
- }
-
- @Test
- public void testDecodeEncodeEqual() throws Exception {
- for (FileWriteResult value : getTestValues()) {
- CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, value);
- }
- }
-
- @Test
- public void testDecodeEncodeVersionNumber() throws Exception {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayInputStream in;
- for (FileWriteResult value : getTestValues()) {
- TEST_CODER.encode(value, out);
- in = new ByteArrayInputStream(out.toByteArray());
-
- assertEquals(FileWriteResult.FileWriteResultCoder.VERSION, in.read());
- }
- }
-
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testEncodedTypeDescriptor() throws Exception {
- assertThat(
- TEST_CODER.getEncodedTypeDescriptor(),
equalTo(TypeDescriptor.of(FileWriteResult.class)));
- }
-}