This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new f287fb8f3c8 [FLINK-34117][filesystem] Fix CompactCoordinator data loss 
upon termination
f287fb8f3c8 is described below

commit f287fb8f3c89cad1db028d8119c748b1c7b2194b
Author: Alexander Fedulov <[email protected]>
AuthorDate: Fri Jan 26 10:38:49 2024 +0100

    [FLINK-34117][filesystem] Fix CompactCoordinator data loss upon termination
    
    This closes #24195.
---
 .../connector/file/table/stream/compact/CompactCoordinator.java   | 4 ++++
 .../file/table/stream/compact/CompactCoordinatorTest.java         | 8 ++++++--
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java
index b4ae2a8a5f3..156a095c9fd 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinator.java
@@ -152,6 +152,10 @@ public class CompactCoordinator extends 
AbstractStreamOperator<CoordinatorOutput
         for (Map.Entry<Long, Map<String, List<Path>>> entry : 
headMap.entrySet()) {
             coordinate(entry.getKey(), entry.getValue());
         }
+        if (checkpointId == Long.MAX_VALUE) {
+            coordinate(checkpointId, currentInputFiles);
+            currentInputFiles.clear();
+        }
         headMap.clear();
     }
 
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinatorTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinatorTest.java
index ae6bb8a89f9..cfbb7d4dc56 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinatorTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/compact/CompactCoordinatorTest.java
@@ -84,10 +84,12 @@ class CompactCoordinatorTest extends 
AbstractCompactTestBase {
                     harness.open();
 
                     harness.processElement(new EndCheckpoint(2, 0, 1), 0);
-
+                    harness.processElement(new InputFile("p2", newFile("f9", 
4)), 0);
+                    // Pipeline terminates
+                    harness.processElement(new EndCheckpoint(Long.MAX_VALUE, 
0, 1), 0);
                     List<CoordinatorOutput> outputs = 
harness.extractOutputValues();
 
-                    assertThat(outputs).hasSize(7);
+                    assertThat(outputs).hasSize(9);
 
                     List<CompactionUnit> cp1Units = new ArrayList<>();
                     for (int i = 0; i < 4; i++) {
@@ -109,6 +111,8 @@ class CompactCoordinatorTest extends 
AbstractCompactTestBase {
                     assertUnit(outputs.get(5), 0, "p0", Arrays.asList("f7", 
"f8"));
 
                     assertEndCompaction(outputs.get(6), 2);
+                    assertUnit(outputs.get(7), 0, "p2", 
Collections.singletonList("f9"));
+                    assertEndCompaction(outputs.get(8), Long.MAX_VALUE);
                 });
     }
 

Reply via email to