This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 34034a6 [FLINK-26151]Avoid inprogressfileRecoverable not be clean up
after restoring the bucket
34034a6 is described below
commit 34034a6c54f3b23825bc318806c683b3495f72d1
Author: lovewin99 <[email protected]>
AuthorDate: Tue Mar 1 10:18:05 2022 +0800
[FLINK-26151]Avoid inprogressfileRecoverable not be clean up after
restoring the bucket
This closes #18776.
---
.../api/functions/sink/filesystem/Bucket.java | 1 +
.../api/functions/sink/filesystem/BucketTest.java | 27 ++++++++++++++++++++++
2 files changed, 28 insertions(+)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index f1f34ba..8b5d2ab 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -142,6 +142,7 @@ public class Bucket<IN, BucketID> {
bucketId,
inProgressFileRecoverable,
state.getInProgressFileCreationTime());
+ inProgressFileRecoverablesPerCheckpoint.put(Long.MIN_VALUE,
inProgressFileRecoverable);
} else {
// if the writer does not support resume, then we close the
// in-progress part and commit it, as done in the case of pending
files.
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index 9fc09cd..245e895 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -117,6 +117,33 @@ public class BucketTest {
assertThat(recoverableWriter, hasCalledDiscard(0)); // we have no
in-progress file.
}
+ @Test
+ public void shouldCleanupOutdatedResumablesAfterResumed() throws Exception
{
+ final File outDir = TEMP_FOLDER.newFolder();
+ final Path path = new Path(outDir.toURI());
+
+ final TestRecoverableWriter recoverableWriter =
getRecoverableWriter(path);
+ final Bucket<String, String> bucketUnderTest =
+ createBucket(recoverableWriter, path, 0, 0,
OutputFileConfig.builder().build());
+
+ bucketUnderTest.write("test-element", 0L);
+ final BucketState<String> state0 =
bucketUnderTest.onReceptionOfCheckpoint(0L);
+ assertThat(state0, hasActiveInProgressFile());
+ bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+ assertThat(recoverableWriter, hasCalledDiscard(0));
+
+ final File newOutDir = TEMP_FOLDER.newFolder();
+ final Path newPath = new Path(newOutDir.toURI());
+ final TestRecoverableWriter newRecoverableWriter =
getRecoverableWriter(newPath);
+ final Bucket<String, String> bucketAfterResume =
+ restoreBucket(
+ newRecoverableWriter, 0, 0, state0,
OutputFileConfig.builder().build());
+ final BucketState<String> state1 =
bucketAfterResume.onReceptionOfCheckpoint(1L);
+ assertThat(state1, hasActiveInProgressFile());
+ bucketAfterResume.onSuccessfulCompletionOfCheckpoint(1L);
+ assertThat(newRecoverableWriter, hasCalledDiscard(1));
+ }
+
// --------------------------- Checking Restore ---------------------------
@Test