This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 67e1975ac6 Flink: Fix writeDataFiles with hardcoded formatVersion
(#14570)
67e1975ac6 is described below
commit 67e1975ac65e2690fc9599d0f5a91c2f0563e337
Author: GuoYu <[email protected]>
AuthorDate: Wed Nov 12 21:10:58 2025 +0800
Flink: Fix writeDataFiles with hardcoded formatVersion (#14570)
---
.../org/apache/iceberg/flink/sink/FlinkManifestUtil.java | 12 ++++++++----
.../org/apache/iceberg/flink/sink/TestFlinkManifest.java | 3 ++-
.../org/apache/iceberg/flink/sink/FlinkManifestUtil.java | 12 ++++++++----
.../org/apache/iceberg/flink/sink/TestFlinkManifest.java | 3 ++-
.../org/apache/iceberg/flink/sink/FlinkManifestUtil.java | 12 ++++++++----
.../org/apache/iceberg/flink/sink/TestFlinkManifest.java | 3 ++-
6 files changed, 30 insertions(+), 15 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 1736d91b1b..c7e79ed8db 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -41,15 +41,15 @@ import org.slf4j.LoggerFactory;
public class FlinkManifestUtil {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkManifestUtil.class);
- private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;
private FlinkManifestUtil() {}
static ManifestFile writeDataFiles(
- OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles)
throws IOException {
+ OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles, int
formatVersion)
+ throws IOException {
ManifestWriter<DataFile> writer =
- ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);
+ ManifestFiles.write(formatVersion, spec, outputFile,
DUMMY_SNAPSHOT_ID);
try (ManifestWriter<DataFile> closeableWriter = writer) {
closeableWriter.addAll(dataFiles);
@@ -108,7 +108,11 @@ public class FlinkManifestUtil {
// Write the completed data files into a newly created data manifest file.
if (result.dataFiles() != null && result.dataFiles().length > 0) {
dataManifest =
- writeDataFiles(outputFileSupplier.get(), spec,
Lists.newArrayList(result.dataFiles()));
+ writeDataFiles(
+ outputFileSupplier.get(),
+ spec,
+ Lists.newArrayList(result.dataFiles()),
+ formatVersion);
}
// Write the completed delete files into a newly created delete manifest
file.
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 4bbd523ec0..9ae435c263 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -213,7 +213,8 @@ public class TestFlinkManifest {
List<DataFile> dataFiles = generateDataFiles(10);
ManifestFile manifest =
- FlinkManifestUtil.writeDataFiles(factory.create(checkpointId),
table.spec(), dataFiles);
+ FlinkManifestUtil.writeDataFiles(
+ factory.create(checkpointId), table.spec(), dataFiles,
TableUtil.formatVersion(table));
byte[] dataV1 =
SimpleVersionedSerialization.writeVersionAndSerialize(new
V1Serializer(), manifest);
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 1736d91b1b..c7e79ed8db 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -41,15 +41,15 @@ import org.slf4j.LoggerFactory;
public class FlinkManifestUtil {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkManifestUtil.class);
- private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;
private FlinkManifestUtil() {}
static ManifestFile writeDataFiles(
- OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles)
throws IOException {
+ OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles, int
formatVersion)
+ throws IOException {
ManifestWriter<DataFile> writer =
- ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);
+ ManifestFiles.write(formatVersion, spec, outputFile,
DUMMY_SNAPSHOT_ID);
try (ManifestWriter<DataFile> closeableWriter = writer) {
closeableWriter.addAll(dataFiles);
@@ -108,7 +108,11 @@ public class FlinkManifestUtil {
// Write the completed data files into a newly created data manifest file.
if (result.dataFiles() != null && result.dataFiles().length > 0) {
dataManifest =
- writeDataFiles(outputFileSupplier.get(), spec,
Lists.newArrayList(result.dataFiles()));
+ writeDataFiles(
+ outputFileSupplier.get(),
+ spec,
+ Lists.newArrayList(result.dataFiles()),
+ formatVersion);
}
// Write the completed delete files into a newly created delete manifest
file.
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 4bbd523ec0..9ae435c263 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -213,7 +213,8 @@ public class TestFlinkManifest {
List<DataFile> dataFiles = generateDataFiles(10);
ManifestFile manifest =
- FlinkManifestUtil.writeDataFiles(factory.create(checkpointId),
table.spec(), dataFiles);
+ FlinkManifestUtil.writeDataFiles(
+ factory.create(checkpointId), table.spec(), dataFiles,
TableUtil.formatVersion(table));
byte[] dataV1 =
SimpleVersionedSerialization.writeVersionAndSerialize(new
V1Serializer(), manifest);
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 1736d91b1b..c7e79ed8db 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -41,15 +41,15 @@ import org.slf4j.LoggerFactory;
public class FlinkManifestUtil {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkManifestUtil.class);
- private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;
private FlinkManifestUtil() {}
static ManifestFile writeDataFiles(
- OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles)
throws IOException {
+ OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles, int
formatVersion)
+ throws IOException {
ManifestWriter<DataFile> writer =
- ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);
+ ManifestFiles.write(formatVersion, spec, outputFile,
DUMMY_SNAPSHOT_ID);
try (ManifestWriter<DataFile> closeableWriter = writer) {
closeableWriter.addAll(dataFiles);
@@ -108,7 +108,11 @@ public class FlinkManifestUtil {
// Write the completed data files into a newly created data manifest file.
if (result.dataFiles() != null && result.dataFiles().length > 0) {
dataManifest =
- writeDataFiles(outputFileSupplier.get(), spec,
Lists.newArrayList(result.dataFiles()));
+ writeDataFiles(
+ outputFileSupplier.get(),
+ spec,
+ Lists.newArrayList(result.dataFiles()),
+ formatVersion);
}
// Write the completed delete files into a newly created delete manifest
file.
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 4bbd523ec0..9ae435c263 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -213,7 +213,8 @@ public class TestFlinkManifest {
List<DataFile> dataFiles = generateDataFiles(10);
ManifestFile manifest =
- FlinkManifestUtil.writeDataFiles(factory.create(checkpointId),
table.spec(), dataFiles);
+ FlinkManifestUtil.writeDataFiles(
+ factory.create(checkpointId), table.spec(), dataFiles,
TableUtil.formatVersion(table));
byte[] dataV1 =
SimpleVersionedSerialization.writeVersionAndSerialize(new
V1Serializer(), manifest);