This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new f287fb8f3c8 [FLINK-34117][filesystem] Fix CompactCoordinator data loss
upon termination
f287fb8f3c8 is described below
commit f287fb8f3c89cad1db028d8119c748b1c7b2194b
Author: Alexander Fedulov <[email protected]>
AuthorDate: Fri Jan 26 10:38:49 2024 +0100
[FLINK-34117][filesystem] Fix CompactCoordinator data loss upon termination
This closes #24195.
---
.../connector/file/table/stream/compact/CompactCoordinator.java | 4 ++++
.../file/table/stream/compact/CompactCoordinatorTest.java | 8 ++++++--
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java
index b4ae2a8a5f3..156a095c9fd 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java
@@ -152,6 +152,10 @@ public class CompactCoordinator extends
AbstractStreamOperator<CoordinatorOutput
for (Map.Entry<Long, Map<String, List<Path>>> entry :
headMap.entrySet()) {
coordinate(entry.getKey(), entry.getValue());
}
+ if (checkpointId == Long.MAX_VALUE) {
+ coordinate(checkpointId, currentInputFiles);
+ currentInputFiles.clear();
+ }
headMap.clear();
}
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinatorTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinatorTest.java
index ae6bb8a89f9..cfbb7d4dc56 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinatorTest.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinatorTest.java
@@ -84,10 +84,12 @@ class CompactCoordinatorTest extends
AbstractCompactTestBase {
harness.open();
harness.processElement(new EndCheckpoint(2, 0, 1), 0);
-
+ harness.processElement(new InputFile("p2", newFile("f9",
4)), 0);
+ // Pipeline terminates
+ harness.processElement(new EndCheckpoint(Long.MAX_VALUE,
0, 1), 0);
List<CoordinatorOutput> outputs =
harness.extractOutputValues();
- assertThat(outputs).hasSize(7);
+ assertThat(outputs).hasSize(9);
List<CompactionUnit> cp1Units = new ArrayList<>();
for (int i = 0; i < 4; i++) {
@@ -109,6 +111,8 @@ class CompactCoordinatorTest extends
AbstractCompactTestBase {
assertUnit(outputs.get(5), 0, "p0", Arrays.asList("f7",
"f8"));
assertEndCompaction(outputs.get(6), 2);
+ assertUnit(outputs.get(7), 0, "p2",
Collections.singletonList("f9"));
+ assertEndCompaction(outputs.get(8), Long.MAX_VALUE);
});
}