This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e160bb89c9 Flink: Backport Support writing DVs in IcebergSink to Flink
2.0 and 1.20 (#14390)
e160bb89c9 is described below
commit e160bb89c949b73685d908823b439b416bec272e
Author: GuoYu <[email protected]>
AuthorDate: Tue Oct 21 23:49:47 2025 +0800
Flink: Backport Support writing DVs in IcebergSink to Flink 2.0 and 1.20
(#14390)
backports #14197
---
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 10 +++++----
.../iceberg/flink/sink/FlinkManifestUtil.java | 8 ++++++--
.../iceberg/flink/sink/IcebergFilesCommitter.java | 6 +++++-
.../iceberg/flink/sink/IcebergWriteAggregator.java | 6 +++++-
.../iceberg/flink/sink/PartitionedDeltaWriter.java | 9 +++++---
.../flink/sink/RowDataTaskWriterFactory.java | 9 ++++++--
.../flink/sink/UnpartitionedDeltaWriter.java | 9 +++++---
.../sink/dynamic/DynamicWriteResultAggregator.java | 3 ++-
.../iceberg/flink/sink/dynamic/DynamicWriter.java | 6 ++++++
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 24 ++++++++++++++--------
.../TestCommittableToTableChangeConverter.java | 9 ++++++--
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 16 ++++++++++-----
.../iceberg/flink/sink/TestFlinkManifest.java | 10 ++++++---
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 10 +++++----
.../iceberg/flink/sink/FlinkManifestUtil.java | 8 ++++++--
.../iceberg/flink/sink/IcebergFilesCommitter.java | 6 +++++-
.../iceberg/flink/sink/IcebergWriteAggregator.java | 6 +++++-
.../iceberg/flink/sink/PartitionedDeltaWriter.java | 9 +++++---
.../flink/sink/RowDataTaskWriterFactory.java | 9 ++++++--
.../flink/sink/UnpartitionedDeltaWriter.java | 9 +++++---
.../sink/dynamic/DynamicWriteResultAggregator.java | 3 ++-
.../iceberg/flink/sink/dynamic/DynamicWriter.java | 6 ++++++
.../org/apache/iceberg/flink/TestFlinkUpsert.java | 24 ++++++++++++++--------
.../TestCommittableToTableChangeConverter.java | 9 ++++++--
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 16 ++++++++++-----
.../iceberg/flink/sink/TestFlinkManifest.java | 10 ++++++---
26 files changed, 178 insertions(+), 72 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index f68eff6912..33a09705e7 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningDVWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -57,8 +58,9 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
Schema schema,
RowType flinkSchema,
Set<Integer> equalityFieldIds,
- boolean upsert) {
- super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize);
+ boolean upsert,
+ boolean useDv) {
+ super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize,
useDv);
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema,
Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
@@ -109,8 +111,8 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
}
protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
- RowDataDeltaWriter(PartitionKey partition) {
- super(partition, schema, deleteSchema, DeleteGranularity.FILE);
+ RowDataDeltaWriter(PartitionKey partition, PartitioningDVWriter<RowData>
dvFileWriter) {
+ super(partition, schema, deleteSchema, DeleteGranularity.FILE,
dvFileWriter);
}
@Override
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 0eeedf2659..1736d91b1b 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -96,7 +96,10 @@ public class FlinkManifestUtil {
* partition spec
*/
public static DeltaManifests writeCompletedFiles(
- WriteResult result, Supplier<OutputFile> outputFileSupplier,
PartitionSpec spec)
+ WriteResult result,
+ Supplier<OutputFile> outputFileSupplier,
+ PartitionSpec spec,
+ int formatVersion)
throws IOException {
ManifestFile dataManifest = null;
@@ -113,7 +116,8 @@ public class FlinkManifestUtil {
OutputFile deleteManifestFile = outputFileSupplier.get();
ManifestWriter<DeleteFile> deleteManifestWriter =
- ManifestFiles.writeDeleteManifest(FORMAT_V2, spec,
deleteManifestFile, DUMMY_SNAPSHOT_ID);
+ ManifestFiles.writeDeleteManifest(
+ formatVersion, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
for (DeleteFile deleteFile : result.deleteFiles()) {
writer.add(deleteFile);
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index b510dce28b..f78a705e66 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -46,6 +46,7 @@ import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -443,7 +444,10 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
WriteResult result = WriteResult.builder().addAll(writeResults).build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
- result, () -> manifestOutputFileFactory.create(checkpointId),
spec);
+ result,
+ () -> manifestOutputFileFactory.create(checkpointId),
+ spec,
+ TableUtil.formatVersion(table));
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
index 794ade5779..7f21c1c372 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
@@ -28,6 +28,7 @@ import
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -110,7 +111,10 @@ class IcebergWriteAggregator extends
AbstractStreamOperator<CommittableMessage<I
WriteResult result = WriteResult.builder().addAll(writeResults).build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
- result, () ->
icebergManifestOutputFileFactory.create(checkpointId), table.spec());
+ result,
+ () -> icebergManifestOutputFileFactory.create(checkpointId),
+ table.spec(),
+ TableUtil.formatVersion(table));
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
index afbc14b7f1..5e597d8e71 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
@@ -50,7 +50,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
Schema schema,
RowType flinkSchema,
Set<Integer> equalityFieldIds,
- boolean upsert) {
+ boolean upsert,
+ boolean useDv) {
super(
spec,
format,
@@ -61,7 +62,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
+ upsert,
+ useDv);
this.partitionKey = new PartitionKey(spec, schema);
}
@@ -74,7 +76,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
// NOTICE: we need to copy a new partition key here, in case of messing
up the keys in
// writers.
PartitionKey copiedKey = partitionKey.copy();
- writer = new RowDataDeltaWriter(copiedKey);
+ writer = new RowDataDeltaWriter(copiedKey, dvFileWriter());
writers.put(copiedKey, writer);
}
@@ -84,6 +86,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
@Override
public void close() {
try {
+ super.close();
Tasks.foreach(writers.values())
.throwFailureWhenFinished()
.noRetry()
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index ef2c795e23..bc3bc51ced 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
@@ -52,6 +53,7 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
private final Set<Integer> equalityFieldIds;
private final boolean upsert;
private final FileWriterFactory<RowData> fileWriterFactory;
+ private boolean useDv;
private transient OutputFileFactory outputFileFactory;
@@ -170,6 +172,7 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
}
refreshTable();
+ this.useDv = TableUtil.formatVersion(table) > 2;
this.outputFileFactory =
OutputFileFactory.builderFor(table, taskId, attemptId)
@@ -221,7 +224,8 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
+ upsert,
+ useDv);
} else {
return new PartitionedDeltaWriter(
spec,
@@ -233,7 +237,8 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
+ upsert,
+ useDv);
}
}
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
index e709206c94..9d749d3062 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
@@ -42,7 +42,8 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
Schema schema,
RowType flinkSchema,
Set<Integer> equalityFieldIds,
- boolean upsert) {
+ boolean upsert,
+ boolean useDv) {
super(
spec,
format,
@@ -53,8 +54,9 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
- this.writer = new RowDataDeltaWriter(null);
+ upsert,
+ useDv);
+ this.writer = new RowDataDeltaWriter(null, dvFileWriter());
}
@Override
@@ -65,5 +67,6 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
@Override
public void close() throws IOException {
writer.close();
+ super.close();
}
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index b92d32fcc4..77bd2a0f97 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -141,7 +141,8 @@ class DynamicWriteResultAggregator
FlinkManifestUtil.writeCompletedFiles(
result,
() -> outputFileFactory(key.tableName()).create(checkpointId),
- spec(key.tableName(), key.specId()));
+ spec(key.tableName(), key.specId()),
+ 2);
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index e99e6e72da..5ed9da8623 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -116,6 +117,11 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
Preconditions.checkState(
!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when
configuring to use UPSERT data.");
+
+ Preconditions.checkArgument(
+ !(TableUtil.formatVersion(table) > 2),
+ "Dynamic Sink writer does not support upsert
mode in tables (V3+)");
+
if (!table.spec().isUnpartitioned()) {
for (PartitionField partitionField :
table.spec().fields()) {
Preconditions.checkState(
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index c5a7ec4bee..e6ca9c5c74 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -48,21 +48,27 @@ public class TestFlinkUpsert extends CatalogTestBase {
@Parameter(index = 3)
private boolean isStreamingJob;
+ @Parameter(index = 4)
+ private int formatVersion;
+
private final Map<String, String> tableUpsertProps = Maps.newHashMap();
private TableEnvironment tEnv;
- @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2},
isStreaming={3}")
+ @Parameters(
+ name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3},
formatVersion={4} ")
public static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format :
new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO,
FileFormat.ORC}) {
- for (Boolean isStreaming : new Boolean[] {true, false}) {
- // Only test with one catalog as this is a file operation concern.
- // FlinkCatalogTestBase requires the catalog name start with
testhadoop if using hadoop
- // catalog.
- String catalogName = "testhadoop";
- Namespace baseNamespace = Namespace.of("default");
- parameters.add(new Object[] {catalogName, baseNamespace, format,
isStreaming});
+ for (int version : org.apache.iceberg.TestHelpers.V2_AND_ABOVE) {
+ for (Boolean isStreaming : new Boolean[] {true, false}) {
+ // Only test with one catalog as this is a file operation concern.
+ // FlinkCatalogTestBase requires the catalog name start with
testhadoop if using hadoop
+ // catalog.
+ String catalogName = "testhadoop";
+ Namespace baseNamespace = Namespace.of("default");
+ parameters.add(new Object[] {catalogName, baseNamespace, format,
isStreaming, version});
+ }
}
}
return parameters;
@@ -98,7 +104,7 @@ public class TestFlinkUpsert extends CatalogTestBase {
sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
- tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+ tableUpsertProps.put(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
index 0c7a47c232..c39e09fd86 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.maintenance.operator.TableChange;
import org.apache.iceberg.io.FileIO;
@@ -116,7 +117,8 @@ class TestCommittableToTableChangeConverter {
.addDeleteFiles(posDeleteFile, eqDeleteFile)
.build();
DeltaManifests deltaManifests =
- FlinkManifestUtil.writeCompletedFiles(writeResult, () ->
factory.create(1), table.spec());
+ FlinkManifestUtil.writeCompletedFiles(
+ writeResult, () -> factory.create(1), table.spec(),
TableUtil.formatVersion(table));
IcebergCommittable committable =
new IcebergCommittable(
SimpleVersionedSerialization.writeVersionAndSerialize(
@@ -297,7 +299,10 @@ class TestCommittableToTableChangeConverter {
.build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
- writeResult, () -> factory.create(checkpointId), table.spec());
+ writeResult,
+ () -> factory.create(checkpointId),
+ table.spec(),
+ TableUtil.formatVersion(table));
IcebergCommittable committable =
new IcebergCommittable(
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index a21c51c378..89f642af1c 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestBase;
+import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.TestTables;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
@@ -80,11 +81,16 @@ public class TestDeltaTaskWriter extends TestBase {
private FileFormat format;
@Parameters(name = "formatVersion = {0}, fileFormat = {1}")
- protected static List<Object> parameters() {
- return Arrays.asList(
- new Object[] {2, FileFormat.AVRO},
- new Object[] {2, FileFormat.ORC},
- new Object[] {2, FileFormat.PARQUET});
+ protected static List<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ for (FileFormat format :
+ new FileFormat[] {FileFormat.AVRO, FileFormat.ORC,
FileFormat.PARQUET}) {
+ for (int version : TestHelpers.V2_AND_ABOVE) {
+ parameters.add(new Object[] {version, format});
+ }
+ }
+
+ return parameters;
}
@Override
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index c6dc984513..4bbd523ec0 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.WriteResult;
@@ -105,7 +106,8 @@ public class TestFlinkManifest {
.addDeleteFiles(posDeleteFiles)
.build(),
() -> factory.create(curCkpId),
- table.spec());
+ table.spec(),
+ TableUtil.formatVersion(table));
WriteResult result =
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(),
table.specs());
@@ -141,7 +143,8 @@ public class TestFlinkManifest {
FlinkManifestUtil.writeCompletedFiles(
WriteResult.builder().addDataFiles(dataFiles).build(),
() -> factory.create(checkpointId),
- table.spec());
+ table.spec(),
+ TableUtil.formatVersion(table));
assertThat(deltaManifests.dataManifest()).isNotNull();
assertThat(deltaManifests.deleteManifest()).isNull();
@@ -180,7 +183,8 @@ public class TestFlinkManifest {
.addDeleteFiles(posDeleteFiles)
.build(),
() -> factory.create(checkpointId),
- table.spec());
+ table.spec(),
+ TableUtil.formatVersion(table));
byte[] versionedSerializeData =
SimpleVersionedSerialization.writeVersionAndSerialize(
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index f68eff6912..33a09705e7 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningDVWriter;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -57,8 +58,9 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
Schema schema,
RowType flinkSchema,
Set<Integer> equalityFieldIds,
- boolean upsert) {
- super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize);
+ boolean upsert,
+ boolean useDv) {
+ super(spec, format, fileWriterFactory, fileFactory, io, targetFileSize,
useDv);
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema,
Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
@@ -109,8 +111,8 @@ abstract class BaseDeltaTaskWriter extends
BaseTaskWriter<RowData> {
}
protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
- RowDataDeltaWriter(PartitionKey partition) {
- super(partition, schema, deleteSchema, DeleteGranularity.FILE);
+ RowDataDeltaWriter(PartitionKey partition, PartitioningDVWriter<RowData>
dvFileWriter) {
+ super(partition, schema, deleteSchema, DeleteGranularity.FILE,
dvFileWriter);
}
@Override
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 0eeedf2659..1736d91b1b 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -96,7 +96,10 @@ public class FlinkManifestUtil {
* partition spec
*/
public static DeltaManifests writeCompletedFiles(
- WriteResult result, Supplier<OutputFile> outputFileSupplier,
PartitionSpec spec)
+ WriteResult result,
+ Supplier<OutputFile> outputFileSupplier,
+ PartitionSpec spec,
+ int formatVersion)
throws IOException {
ManifestFile dataManifest = null;
@@ -113,7 +116,8 @@ public class FlinkManifestUtil {
OutputFile deleteManifestFile = outputFileSupplier.get();
ManifestWriter<DeleteFile> deleteManifestWriter =
- ManifestFiles.writeDeleteManifest(FORMAT_V2, spec,
deleteManifestFile, DUMMY_SNAPSHOT_ID);
+ ManifestFiles.writeDeleteManifest(
+ formatVersion, spec, deleteManifestFile, DUMMY_SNAPSHOT_ID);
try (ManifestWriter<DeleteFile> writer = deleteManifestWriter) {
for (DeleteFile deleteFile : result.deleteFiles()) {
writer.add(deleteFile);
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 89432cff2b..b9ac0f9906 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -47,6 +47,7 @@ import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
@@ -446,7 +447,10 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
WriteResult result = WriteResult.builder().addAll(writeResults).build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
- result, () -> manifestOutputFileFactory.create(checkpointId),
spec);
+ result,
+ () -> manifestOutputFileFactory.create(checkpointId),
+ spec,
+ TableUtil.formatVersion(table));
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
index 794ade5779..7f21c1c372 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergWriteAggregator.java
@@ -28,6 +28,7 @@ import
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -110,7 +111,10 @@ class IcebergWriteAggregator extends
AbstractStreamOperator<CommittableMessage<I
WriteResult result = WriteResult.builder().addAll(writeResults).build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
- result, () ->
icebergManifestOutputFileFactory.create(checkpointId), table.spec());
+ result,
+ () -> icebergManifestOutputFileFactory.create(checkpointId),
+ table.spec(),
+ TableUtil.formatVersion(table));
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
index afbc14b7f1..5e597d8e71 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
@@ -50,7 +50,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
Schema schema,
RowType flinkSchema,
Set<Integer> equalityFieldIds,
- boolean upsert) {
+ boolean upsert,
+ boolean useDv) {
super(
spec,
format,
@@ -61,7 +62,8 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
+ upsert,
+ useDv);
this.partitionKey = new PartitionKey(spec, schema);
}
@@ -74,7 +76,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
// NOTICE: we need to copy a new partition key here, in case of messing
up the keys in
// writers.
PartitionKey copiedKey = partitionKey.copy();
- writer = new RowDataDeltaWriter(copiedKey);
+ writer = new RowDataDeltaWriter(copiedKey, dvFileWriter());
writers.put(copiedKey, writer);
}
@@ -84,6 +86,7 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
@Override
public void close() {
try {
+ super.close();
Tasks.foreach(writers.values())
.throwFailureWhenFinished()
.noRetry()
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index ef2c795e23..bc3bc51ced 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.FileWriterFactory;
@@ -52,6 +53,7 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
private final Set<Integer> equalityFieldIds;
private final boolean upsert;
private final FileWriterFactory<RowData> fileWriterFactory;
+ private boolean useDv;
private transient OutputFileFactory outputFileFactory;
@@ -170,6 +172,7 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
}
refreshTable();
+ this.useDv = TableUtil.formatVersion(table) > 2;
this.outputFileFactory =
OutputFileFactory.builderFor(table, taskId, attemptId)
@@ -221,7 +224,8 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
+ upsert,
+ useDv);
} else {
return new PartitionedDeltaWriter(
spec,
@@ -233,7 +237,8 @@ public class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
+ upsert,
+ useDv);
}
}
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
index e709206c94..9d749d3062 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
@@ -42,7 +42,8 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
Schema schema,
RowType flinkSchema,
Set<Integer> equalityFieldIds,
- boolean upsert) {
+ boolean upsert,
+ boolean useDv) {
super(
spec,
format,
@@ -53,8 +54,9 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
schema,
flinkSchema,
equalityFieldIds,
- upsert);
- this.writer = new RowDataDeltaWriter(null);
+ upsert,
+ useDv);
+ this.writer = new RowDataDeltaWriter(null, dvFileWriter());
}
@Override
@@ -65,5 +67,6 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
@Override
public void close() throws IOException {
writer.close();
+ super.close();
}
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
index b92d32fcc4..77bd2a0f97 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java
@@ -141,7 +141,8 @@ class DynamicWriteResultAggregator
FlinkManifestUtil.writeCompletedFiles(
result,
() -> outputFileFactory(key.tableName()).create(checkpointId),
- spec(key.tableName(), key.specId()));
+ spec(key.tableName(), key.specId()),
+ 2);
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
index e99e6e72da..5ed9da8623 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -116,6 +117,11 @@ class DynamicWriter implements
CommittingSinkWriter<DynamicRecordInternal, Dynam
Preconditions.checkState(
!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when
configuring to use UPSERT data.");
+
+ Preconditions.checkArgument(
+ !(TableUtil.formatVersion(table) > 2),
+ "Dynamic Sink writer does not support upsert
mode in tables (V3+)");
+
if (!table.spec().isUnpartitioned()) {
for (PartitionField partitionField :
table.spec().fields()) {
Preconditions.checkState(
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
index c5a7ec4bee..e6ca9c5c74 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/TestFlinkUpsert.java
@@ -48,21 +48,27 @@ public class TestFlinkUpsert extends CatalogTestBase {
@Parameter(index = 3)
private boolean isStreamingJob;
+ @Parameter(index = 4)
+ private int formatVersion;
+
private final Map<String, String> tableUpsertProps = Maps.newHashMap();
private TableEnvironment tEnv;
- @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2},
isStreaming={3}")
+ @Parameters(
+ name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3},
formatVersion={4} ")
public static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format :
new FileFormat[] {FileFormat.PARQUET, FileFormat.AVRO,
FileFormat.ORC}) {
- for (Boolean isStreaming : new Boolean[] {true, false}) {
- // Only test with one catalog as this is a file operation concern.
- // FlinkCatalogTestBase requires the catalog name start with
testhadoop if using hadoop
- // catalog.
- String catalogName = "testhadoop";
- Namespace baseNamespace = Namespace.of("default");
- parameters.add(new Object[] {catalogName, baseNamespace, format,
isStreaming});
+ for (int version : org.apache.iceberg.TestHelpers.V2_AND_ABOVE) {
+ for (Boolean isStreaming : new Boolean[] {true, false}) {
+ // Only test with one catalog as this is a file operation concern.
+ // FlinkCatalogTestBase requires the catalog name start with
testhadoop if using hadoop
+ // catalog.
+ String catalogName = "testhadoop";
+ Namespace baseNamespace = Namespace.of("default");
+ parameters.add(new Object[] {catalogName, baseNamespace, format,
isStreaming, version});
+ }
}
}
return parameters;
@@ -98,7 +104,7 @@ public class TestFlinkUpsert extends CatalogTestBase {
sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
- tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+ tableUpsertProps.put(TableProperties.FORMAT_VERSION,
String.valueOf(formatVersion));
tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
index 0c7a47c232..c39e09fd86 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestCommittableToTableChangeConverter.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.maintenance.operator.TableChange;
import org.apache.iceberg.io.FileIO;
@@ -116,7 +117,8 @@ class TestCommittableToTableChangeConverter {
.addDeleteFiles(posDeleteFile, eqDeleteFile)
.build();
DeltaManifests deltaManifests =
- FlinkManifestUtil.writeCompletedFiles(writeResult, () ->
factory.create(1), table.spec());
+ FlinkManifestUtil.writeCompletedFiles(
+ writeResult, () -> factory.create(1), table.spec(),
TableUtil.formatVersion(table));
IcebergCommittable committable =
new IcebergCommittable(
SimpleVersionedSerialization.writeVersionAndSerialize(
@@ -297,7 +299,10 @@ class TestCommittableToTableChangeConverter {
.build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
- writeResult, () -> factory.create(checkpointId), table.spec());
+ writeResult,
+ () -> factory.create(checkpointId),
+ table.spec(),
+ TableUtil.formatVersion(table));
IcebergCommittable committable =
new IcebergCommittable(
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index a21c51c378..89f642af1c 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestBase;
+import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.TestTables;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
@@ -80,11 +81,16 @@ public class TestDeltaTaskWriter extends TestBase {
private FileFormat format;
@Parameters(name = "formatVersion = {0}, fileFormat = {1}")
- protected static List<Object> parameters() {
- return Arrays.asList(
- new Object[] {2, FileFormat.AVRO},
- new Object[] {2, FileFormat.ORC},
- new Object[] {2, FileFormat.PARQUET});
+ protected static List<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ for (FileFormat format :
+ new FileFormat[] {FileFormat.AVRO, FileFormat.ORC,
FileFormat.PARQUET}) {
+ for (int version : TestHelpers.V2_AND_ABOVE) {
+ parameters.add(new Object[] {version, format});
+ }
+ }
+
+ return parameters;
}
@Override
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index c6dc984513..4bbd523ec0 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -39,6 +39,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableUtil;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.WriteResult;
@@ -105,7 +106,8 @@ public class TestFlinkManifest {
.addDeleteFiles(posDeleteFiles)
.build(),
() -> factory.create(curCkpId),
- table.spec());
+ table.spec(),
+ TableUtil.formatVersion(table));
WriteResult result =
FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(),
table.specs());
@@ -141,7 +143,8 @@ public class TestFlinkManifest {
FlinkManifestUtil.writeCompletedFiles(
WriteResult.builder().addDataFiles(dataFiles).build(),
() -> factory.create(checkpointId),
- table.spec());
+ table.spec(),
+ TableUtil.formatVersion(table));
assertThat(deltaManifests.dataManifest()).isNotNull();
assertThat(deltaManifests.deleteManifest()).isNull();
@@ -180,7 +183,8 @@ public class TestFlinkManifest {
.addDeleteFiles(posDeleteFiles)
.build(),
() -> factory.create(checkpointId),
- table.spec());
+ table.spec(),
+ TableUtil.formatVersion(table));
byte[] versionedSerializeData =
SimpleVersionedSerialization.writeVersionAndSerialize(