This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5de507835b2b9a93376820b79a435b8efe53b8a6
Author: Kostas Kloudas <[email protected]>
AuthorDate: Thu Nov 22 11:03:13 2018 +0100

    [FLINK-10963][fs-connector, s3] Cleanup tmp S3 objects uploaded as backups 
of in-progress files.
---
 .../apache/flink/core/fs/RecoverableWriter.java    |  28 +++
 .../core/fs/local/LocalRecoverableWriter.java      |  10 +
 .../runtime/fs/hdfs/HadoopRecoverableWriter.java   |  10 +
 .../writer/RecoverableMultiPartUploadImpl.java     |   5 -
 .../flink/fs/s3/common/writer/S3AccessHelper.java  |  10 +
 .../S3RecoverableMultipartUploadFactory.java       |   8 -
 .../fs/s3/common/writer/S3RecoverableWriter.java   |  34 ++-
 .../writer/RecoverableMultiPartUploadImplTest.java |   5 +
 .../flink/fs/s3hadoop/HadoopS3AccessHelper.java    |  15 +-
 .../s3hadoop/HadoopS3RecoverableWriterITCase.java  |  46 ++++
 .../api/functions/sink/filesystem/Bucket.java      |  87 +++++--
 .../api/functions/sink/filesystem/BucketTest.java  | 263 +++++++++++++++++++++
 12 files changed, 473 insertions(+), 48 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
index e5bfdb8..7d54b11 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
@@ -122,6 +122,34 @@ public interface RecoverableWriter {
        RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) 
throws IOException;
 
        /**
+        * Marks if the writer requires to do any additional cleanup/freeing of 
resources occupied
+        * as part of a {@link ResumeRecoverable}, e.g. temporarily files 
created or objects uploaded
+        * to external systems.
+        *
+        * <p>In case cleanup is required, then {@link 
#cleanupRecoverableState(ResumeRecoverable)} should
+        * be called.
+        *
+        * @return {@code true} if cleanup is required, {@code false} otherwise.
+        */
+       boolean requiresCleanupOfRecoverableState();
+
+       /**
+        * Frees up any resources that were previously occupied in order to be 
able to
+        * recover from a (potential) failure. These can be temporary files 
that were written
+        * to the filesystem or objects that were uploaded to S3.
+        *
+        * <p><b>NOTE:</b> This operation should not throw an exception if the 
resumable has already
+        * been cleaned up and the resources have been freed. But the contract 
is that it will throw
+        * an {@link UnsupportedOperationException} if it is called for a 
{@code RecoverableWriter}
+        * whose {@link #requiresCleanupOfRecoverableState()} returns {@code 
false}.
+        *
+        * @param resumable The {@link ResumeRecoverable} whose state we want 
to clean-up.
+        * @return {@code true} if the resources were successfully freed, 
{@code false} otherwise
+        * (e.g. the file to be deleted was not there for any reason - already 
deleted or never created).
+        */
+       boolean cleanupRecoverableState(ResumeRecoverable resumable) throws 
IOException;
+
+       /**
         * Recovers a recoverable stream consistently at the point indicated by 
the given CommitRecoverable
         * for finalizing and committing. This will publish the target file 
with exactly the data
         * that was written up to the point then the CommitRecoverable was 
created.
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
index a2f0485..a43e0b6 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
@@ -71,6 +71,16 @@ public class LocalRecoverableWriter implements 
RecoverableWriter {
        }
 
        @Override
+       public boolean requiresCleanupOfRecoverableState() {
+               return false;
+       }
+
+       @Override
+       public boolean cleanupRecoverableState(ResumeRecoverable resumable) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
        public Committer recoverForCommit(CommitRecoverable recoverable) throws 
IOException {
                if (recoverable instanceof LocalRecoverable) {
                        return new 
LocalRecoverableFsDataOutputStream.LocalCommitter((LocalRecoverable) 
recoverable);
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
index 305f8ee..03d741b 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
@@ -78,6 +78,16 @@ public class HadoopRecoverableWriter implements 
RecoverableWriter {
        }
 
        @Override
+       public boolean requiresCleanupOfRecoverableState() {
+               return false;
+       }
+
+       @Override
+       public boolean cleanupRecoverableState(ResumeRecoverable resumable) 
throws IOException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
        public Committer recoverForCommit(CommitRecoverable recoverable) throws 
IOException {
                if (recoverable instanceof HadoopFsRecoverable) {
                        return new 
HadoopRecoverableFsDataOutputStream.HadoopFsCommitter(fs, (HadoopFsRecoverable) 
recoverable);
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index 787f286..9d88e65 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -174,11 +174,6 @@ final class RecoverableMultiPartUploadImpl implements 
RecoverableMultiPartUpload
                final String incompletePartObjectName = 
createIncompletePartObjectName();
                file.retain();
                try (InputStream inputStream = file.getInputStream()) {
-
-                       // TODO: staged incomplete parts are not cleaned up as
-                       // they do not fall under the user's global TTL on S3.
-                       // Figure out a way to clean them.
-
                        s3AccessHelper.putObject(incompletePartObjectName, 
inputStream, file.getPos());
                }
                finally {
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index dbc099a..bcdea3c 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -93,6 +93,16 @@ public interface S3AccessHelper {
        CompleteMultipartUploadResult commitMultiPartUpload(String key, String 
uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) 
throws IOException;
 
        /**
+        * Deletes the object associated with the provided key.
+        *
+        * @param key The key to be deleted.
+        * @return {@code true} if the resources were successfully freed, 
{@code false} otherwise
+        * (e.g. the file to be deleted was not there).
+        * @throws IOException
+        */
+       boolean deleteObject(String key) throws IOException;
+
+       /**
         * Gets the object associated with the provided {@code key} from S3 and
         * puts it in the provided {@code targetLocation}.
         *
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index ddb09ab..3727e25 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -96,14 +96,6 @@ final class S3RecoverableMultipartUploadFactory {
                final File file = refCountedFile.getFile();
                final long numBytes = s3AccessHelper.getObject(objectKey, file);
 
-               // some sanity checks
-               if (numBytes != file.length()) {
-                       throw new IOException(String.format("Error recovering 
writer: " +
-                                                       "Downloading the last 
data chunk file gives incorrect length. " +
-                                                       "File=%d bytes, 
Stream=%d bytes",
-                                       file.length(), numBytes));
-               }
-
                if (numBytes != recoverable.incompleteObjectLength()) {
                        throw new IOException(String.format("Error recovering 
writer: " +
                                                        "Downloading the last 
data chunk file gives incorrect length." +
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
index 698f65f..ddb4443 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
@@ -26,7 +26,6 @@ import 
org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.fs.s3.common.utils.RefCountedFile;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +36,7 @@ import java.util.concurrent.Executor;
 
 import static 
org.apache.flink.fs.s3.common.FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An implementation of the {@link RecoverableWriter} against S3.
@@ -54,16 +54,20 @@ public class S3RecoverableWriter implements 
RecoverableWriter {
 
        private final long userDefinedMinPartSize;
 
+       private final S3AccessHelper s3AccessHelper;
+
        private final S3RecoverableMultipartUploadFactory uploadFactory;
 
        @VisibleForTesting
        S3RecoverableWriter(
+                       final S3AccessHelper s3AccessHelper,
                        final S3RecoverableMultipartUploadFactory uploadFactory,
                        final FunctionWithException<File, RefCountedFile, 
IOException> tempFileCreator,
                        final long userDefinedMinPartSize) {
 
-               this.uploadFactory = Preconditions.checkNotNull(uploadFactory);
-               this.tempFileCreator = 
Preconditions.checkNotNull(tempFileCreator);
+               this.s3AccessHelper = checkNotNull(s3AccessHelper);
+               this.uploadFactory = checkNotNull(uploadFactory);
+               this.tempFileCreator = checkNotNull(tempFileCreator);
                this.userDefinedMinPartSize = userDefinedMinPartSize;
        }
 
@@ -78,14 +82,14 @@ public class S3RecoverableWriter implements 
RecoverableWriter {
        }
 
        @Override
-       public Committer recoverForCommit(RecoverableWriter.CommitRecoverable 
recoverable) throws IOException {
+       public Committer recoverForCommit(CommitRecoverable recoverable) throws 
IOException {
                final S3Recoverable s3recoverable = 
castToS3Recoverable(recoverable);
                final S3RecoverableFsDataOutputStream recovered = 
recover(s3recoverable);
                return recovered.closeForCommit();
        }
 
        @Override
-       public S3RecoverableFsDataOutputStream 
recover(RecoverableWriter.ResumeRecoverable recoverable) throws IOException {
+       public S3RecoverableFsDataOutputStream recover(ResumeRecoverable 
recoverable) throws IOException {
                final S3Recoverable s3recoverable = 
castToS3Recoverable(recoverable);
 
                final RecoverableMultiPartUpload upload = 
uploadFactory.recoverRecoverableUpload(s3recoverable);
@@ -98,14 +102,26 @@ public class S3RecoverableWriter implements 
RecoverableWriter {
        }
 
        @Override
+       public boolean requiresCleanupOfRecoverableState() {
+               return true;
+       }
+
+       @Override
+       public boolean cleanupRecoverableState(ResumeRecoverable resumable) 
throws IOException {
+               final S3Recoverable s3recoverable = 
castToS3Recoverable(resumable);
+               final String smallPartObjectToDelete = 
s3recoverable.incompleteObjectName();
+               return smallPartObjectToDelete != null && 
s3AccessHelper.deleteObject(smallPartObjectToDelete);
+       }
+
+       @Override
        @SuppressWarnings({"rawtypes", "unchecked"})
-       public SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> 
getCommitRecoverableSerializer() {
+       public SimpleVersionedSerializer<CommitRecoverable> 
getCommitRecoverableSerializer() {
                return (SimpleVersionedSerializer) 
S3RecoverableSerializer.INSTANCE;
        }
 
        @Override
        @SuppressWarnings({"rawtypes", "unchecked"})
-       public SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> 
getResumeRecoverableSerializer() {
+       public SimpleVersionedSerializer<ResumeRecoverable> 
getResumeRecoverableSerializer() {
                return (SimpleVersionedSerializer) 
S3RecoverableSerializer.INSTANCE;
        }
 
@@ -116,7 +132,7 @@ public class S3RecoverableWriter implements 
RecoverableWriter {
 
        // --------------------------- Utils ---------------------------
 
-       private static S3Recoverable 
castToS3Recoverable(RecoverableWriter.CommitRecoverable recoverable) {
+       private static S3Recoverable castToS3Recoverable(CommitRecoverable 
recoverable) {
                if (recoverable instanceof S3Recoverable) {
                        return (S3Recoverable) recoverable;
                }
@@ -144,6 +160,6 @@ public class S3RecoverableWriter implements 
RecoverableWriter {
                                                uploadThreadPool,
                                                tempFileCreator);
 
-               return new S3RecoverableWriter(uploadFactory, tempFileCreator, 
userDefinedMinPartSize);
+               return new S3RecoverableWriter(s3AccessHelper, uploadFactory, 
tempFileCreator, userDefinedMinPartSize);
        }
 }
diff --git 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index a986111..673796d 100644
--- 
a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ 
b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -373,6 +373,11 @@ public class RecoverableMultiPartUploadImplTest {
                }
 
                @Override
+               public boolean deleteObject(String key) throws IOException {
+                       return false;
+               }
+
+               @Override
                public long getObject(String key, File targetLocation) throws 
IOException {
                        return 0;
                }
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index 473439c..b9612ad 100644
--- 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -86,6 +86,11 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
        }
 
        @Override
+       public boolean deleteObject(String key) throws IOException {
+               return s3a.delete(new org.apache.hadoop.fs.Path('/' + key), 
false);
+       }
+
+       @Override
        public long getObject(String key, File targetLocation) throws 
IOException {
                long numBytes = 0L;
                try (
@@ -96,12 +101,20 @@ public class HadoopS3AccessHelper implements 
S3AccessHelper {
                        final byte[] buffer = new byte[32 * 1024];
 
                        int numRead;
-                       while ((numRead = inStream.read(buffer)) > 0) {
+                       while ((numRead = inStream.read(buffer)) != -1) {
                                outStream.write(buffer, 0, numRead);
                                numBytes += numRead;
                        }
                }
 
+               // some sanity checks
+               if (numBytes != targetLocation.length()) {
+                       throw new IOException(String.format("Error recovering 
writer: " +
+                                                       "Downloading the last 
data chunk file gives incorrect length. " +
+                                                       "File=%d bytes, 
Stream=%d bytes",
+                                       targetLocation.length(), numBytes));
+               }
+
                return numBytes;
        }
 
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java
index 17fb02b..6c8619d 100644
--- 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+import org.apache.flink.fs.s3.common.writer.S3Recoverable;
 import org.apache.flink.testutils.s3.S3TestCredentials;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.StringUtils;
@@ -42,6 +43,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.BufferedReader;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
@@ -213,6 +215,50 @@ public class HadoopS3RecoverableWriterITCase extends 
TestLogger {
                Assert.assertEquals(testData1 + testData2, 
getContentsOfFile(path));
        }
 
+       @Test(expected = FileNotFoundException.class)
+       public void testCleanupRecoverableState() throws Exception {
+               final RecoverableWriter writer = getRecoverableWriter();
+               final Path path = new Path(basePathForTest, "part-0");
+
+               final RecoverableFsDataOutputStream stream = writer.open(path);
+               stream.write(bytesOf(testData1));
+               S3Recoverable recoverable = (S3Recoverable) stream.persist();
+
+               stream.closeForCommit().commit();
+
+               // still the data is there as we have not deleted them from the 
tmp object
+               final String content = getContentsOfFile(new Path('/' + 
recoverable.incompleteObjectName()));
+               Assert.assertEquals(testData1, content);
+
+               boolean successfullyDeletedState = 
writer.cleanupRecoverableState(recoverable);
+               Assert.assertTrue(successfullyDeletedState);
+
+               // this should throw the exception as we deleted the file.
+               getContentsOfFile(new Path('/' + 
recoverable.incompleteObjectName()));
+       }
+
+       @Test
+       public void testCallingDeleteObjectTwiceDoesNotThroughException() 
throws Exception {
+               final RecoverableWriter writer = getRecoverableWriter();
+               final Path path = new Path(basePathForTest, "part-0");
+
+               final RecoverableFsDataOutputStream stream = writer.open(path);
+               stream.write(bytesOf(testData1));
+               S3Recoverable recoverable = (S3Recoverable) stream.persist();
+
+               stream.closeForCommit().commit();
+
+               // still the data is there as we have not deleted them from the 
tmp object
+               final String content = getContentsOfFile(new Path('/' + 
recoverable.incompleteObjectName()));
+               Assert.assertEquals(testData1, content);
+
+               boolean successfullyDeletedState = 
writer.cleanupRecoverableState(recoverable);
+               Assert.assertTrue(successfullyDeletedState);
+
+               boolean unsuccessfulDeletion = 
writer.cleanupRecoverableState(recoverable);
+               Assert.assertFalse(unsuccessfulDeletion);
+       }
+
        // ----------------------- Test Recovery -----------------------
 
        @Test
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 8ba35b8..b59c84e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -23,6 +23,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,11 +33,12 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Objects;
+import java.util.TreeMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -65,14 +68,16 @@ public class Bucket<IN, BucketID> {
 
        private final RollingPolicy<IN, BucketID> rollingPolicy;
 
-       private final Map<Long, List<RecoverableWriter.CommitRecoverable>> 
pendingPartsPerCheckpoint;
+       private final NavigableMap<Long, ResumeRecoverable> 
resumablesPerCheckpoint;
+
+       private final NavigableMap<Long, List<CommitRecoverable>> 
pendingPartsPerCheckpoint;
 
        private long partCounter;
 
        @Nullable
        private PartFileWriter<IN, BucketID> inProgressPart;
 
-       private List<RecoverableWriter.CommitRecoverable> 
pendingPartsForCurrentCheckpoint;
+       private List<CommitRecoverable> pendingPartsForCurrentCheckpoint;
 
        /**
         * Constructor to create a new empty bucket.
@@ -95,7 +100,8 @@ public class Bucket<IN, BucketID> {
                this.rollingPolicy = checkNotNull(rollingPolicy);
 
                this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
-               this.pendingPartsPerCheckpoint = new HashMap<>();
+               this.pendingPartsPerCheckpoint = new TreeMap<>();
+               this.resumablesPerCheckpoint = new TreeMap<>();
        }
 
        /**
@@ -123,21 +129,26 @@ public class Bucket<IN, BucketID> {
        }
 
        private void restoreInProgressFile(final BucketState<BucketID> state) 
throws IOException {
+               if (!state.hasInProgressResumableFile()) {
+                       return;
+               }
 
                // we try to resume the previous in-progress file
-               if (state.hasInProgressResumableFile()) {
-                       final RecoverableWriter.ResumeRecoverable resumable = 
state.getInProgressResumableFile();
-                       final RecoverableFsDataOutputStream stream = 
fsWriter.recover(resumable);
-                       inProgressPart = partFileFactory.resumeFrom(
-                                       bucketId, stream, resumable, 
state.getInProgressFileCreationTime());
+               final ResumeRecoverable resumable = 
state.getInProgressResumableFile();
+               final RecoverableFsDataOutputStream stream = 
fsWriter.recover(resumable);
+               inProgressPart = partFileFactory.resumeFrom(
+                               bucketId, stream, resumable, 
state.getInProgressFileCreationTime());
+
+               if (fsWriter.requiresCleanupOfRecoverableState()) {
+                       fsWriter.cleanupRecoverableState(resumable);
                }
        }
 
        private void commitRecoveredPendingFiles(final BucketState<BucketID> 
state) throws IOException {
 
                // we commit pending files for checkpoints that precess the 
last successful one, from which we are recovering
-               for (List<RecoverableWriter.CommitRecoverable> committables: 
state.getCommittableFilesPerCheckpoint().values()) {
-                       for (RecoverableWriter.CommitRecoverable committable: 
committables) {
+               for (List<CommitRecoverable> committables: 
state.getCommittableFilesPerCheckpoint().values()) {
+                       for (CommitRecoverable committable: committables) {
                                
fsWriter.recoverForCommit(committable).commitAfterRecovery();
                        }
                }
@@ -172,7 +183,7 @@ public class Bucket<IN, BucketID> {
                checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
                checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
 
-               RecoverableWriter.CommitRecoverable committable = 
bucket.closePartFile();
+               CommitRecoverable committable = bucket.closePartFile();
                if (committable != null) {
                        pendingPartsForCurrentCheckpoint.add(committable);
                }
@@ -214,8 +225,8 @@ public class Bucket<IN, BucketID> {
                return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + 
'-' + partCounter);
        }
 
-       private RecoverableWriter.CommitRecoverable closePartFile() throws 
IOException {
-               RecoverableWriter.CommitRecoverable committable = null;
+       private CommitRecoverable closePartFile() throws IOException {
+               CommitRecoverable committable = null;
                if (inProgressPart != null) {
                        committable = inProgressPart.closeForCommit();
                        pendingPartsForCurrentCheckpoint.add(committable);
@@ -233,12 +244,21 @@ public class Bucket<IN, BucketID> {
        BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws 
IOException {
                prepareBucketForCheckpointing(checkpointId);
 
-               RecoverableWriter.ResumeRecoverable inProgressResumable = null;
+               ResumeRecoverable inProgressResumable = null;
                long inProgressFileCreationTime = Long.MAX_VALUE;
 
                if (inProgressPart != null) {
                        inProgressResumable = inProgressPart.persist();
                        inProgressFileCreationTime = 
inProgressPart.getCreationTime();
+
+                       // the following is an optimization so that writers 
that do not
+                       // require cleanup, they do not have to keep track of 
resumables
+                       // and later iterate over the active buckets.
+                       // (see onSuccessfulCompletionOfCheckpoint())
+
+                       if (fsWriter.requiresCleanupOfRecoverableState()) {
+                               this.resumablesPerCheckpoint.put(checkpointId, 
inProgressResumable);
+                       }
                }
 
                return new BucketState<>(bucketId, bucketPath, 
inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
@@ -261,17 +281,34 @@ public class Bucket<IN, BucketID> {
        void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws 
IOException {
                checkNotNull(fsWriter);
 
-               Iterator<Map.Entry<Long, 
List<RecoverableWriter.CommitRecoverable>>> it =
-                               pendingPartsPerCheckpoint.entrySet().iterator();
+               Iterator<Map.Entry<Long, List<CommitRecoverable>>> it =
+                               pendingPartsPerCheckpoint.headMap(checkpointId, 
true)
+                                               .entrySet().iterator();
+
+               while (it.hasNext()) {
+                       Map.Entry<Long, List<CommitRecoverable>> entry = 
it.next();
+
+                       for (CommitRecoverable committable : entry.getValue()) {
+                               fsWriter.recoverForCommit(committable).commit();
+                       }
+                       it.remove();
+               }
+
+               cleanupOutdatedResumables(checkpointId);
+       }
+
+       private void cleanupOutdatedResumables(long checkpointId) throws 
IOException {
+               Iterator<Map.Entry<Long, ResumeRecoverable>> it =
+                               resumablesPerCheckpoint.headMap(checkpointId, 
false)
+                                               .entrySet().iterator();
 
                while (it.hasNext()) {
-                       Map.Entry<Long, 
List<RecoverableWriter.CommitRecoverable>> entry = it.next();
+                       final ResumeRecoverable recoverable = 
it.next().getValue();
+                       final boolean successfullyDeleted = 
fsWriter.cleanupRecoverableState(recoverable);
+                       it.remove();
 
-                       if (entry.getKey() <= checkpointId) {
-                               for (RecoverableWriter.CommitRecoverable 
committable : entry.getValue()) {
-                                       
fsWriter.recoverForCommit(committable).commit();
-                               }
-                               it.remove();
+                       if (LOG.isDebugEnabled() && successfullyDeleted) {
+                               LOG.debug("Subtask {} successfully deleted 
incomplete part for bucket id={}.", subtaskIndex, bucketId);
                        }
                }
        }
@@ -290,7 +327,7 @@ public class Bucket<IN, BucketID> {
        // --------------------------- Testing Methods 
-----------------------------
 
        @VisibleForTesting
-       Map<Long, List<RecoverableWriter.CommitRecoverable>> 
getPendingPartsPerCheckpoint() {
+       Map<Long, List<CommitRecoverable>> getPendingPartsPerCheckpoint() {
                return pendingPartsPerCheckpoint;
        }
 
@@ -301,7 +338,7 @@ public class Bucket<IN, BucketID> {
        }
 
        @VisibleForTesting
-       List<RecoverableWriter.CommitRecoverable> 
getPendingPartsForCurrentCheckpoint() {
+       List<CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
                return pendingPartsForCurrentCheckpoint;
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
new file mode 100644
index 0000000..f328fd7
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.fs.local.LocalRecoverableWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link Bucket}.
+ */
+public class BucketTest {
+
+       @ClassRule
+       public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+       @Test
+       public void shouldNotCleanupResumablesThatArePartOfTheAckedCheckpoint() 
throws IOException {
+               final File outDir = TEMP_FOLDER.newFolder();
+               final Path path = new Path(outDir.toURI());
+
+               final TestRecoverableWriter recoverableWriter = 
getRecoverableWriter(path);
+               final Bucket<String, String> bucketUnderTest =
+                               createBucket(recoverableWriter, path, 0, 0);
+
+               bucketUnderTest.write("test-element", 0L);
+
+               final BucketState<String> state = 
bucketUnderTest.onReceptionOfCheckpoint(0L);
+               assertThat(state, hasActiveInProgressFile());
+
+               bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+               assertThat(recoverableWriter, hasCalledDiscard(0)); // it did 
not discard as this is still valid.
+       }
+
+       @Test
+       public void shouldCleanupOutdatedResumablesOnCheckpointAck() throws 
IOException {
+               final File outDir = TEMP_FOLDER.newFolder();
+               final Path path = new Path(outDir.toURI());
+
+               final TestRecoverableWriter recoverableWriter = 
getRecoverableWriter(path);
+               final Bucket<String, String> bucketUnderTest =
+                               createBucket(recoverableWriter, path, 0, 0);
+
+               bucketUnderTest.write("test-element", 0L);
+
+               final BucketState<String> state = 
bucketUnderTest.onReceptionOfCheckpoint(0L);
+               assertThat(state, hasActiveInProgressFile());
+
+               bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+
+               bucketUnderTest.onReceptionOfCheckpoint(1L);
+               bucketUnderTest.onReceptionOfCheckpoint(2L);
+
+               bucketUnderTest.onSuccessfulCompletionOfCheckpoint(2L);
+               assertThat(recoverableWriter, hasCalledDiscard(2)); // that is 
for checkpoints 0 and 1
+       }
+
+       @Test
+       public void shouldCleanupResumableAfterRestoring() throws Exception {
+               final File outDir = TEMP_FOLDER.newFolder();
+               final Path path = new Path(outDir.toURI());
+
+               final TestRecoverableWriter recoverableWriter = 
getRecoverableWriter(path);
+               final Bucket<String, String> bucketUnderTest =
+                               createBucket(recoverableWriter, path, 0, 0);
+
+               bucketUnderTest.write("test-element", 0L);
+
+               final BucketState<String> state = 
bucketUnderTest.onReceptionOfCheckpoint(0L);
+               assertThat(state, hasActiveInProgressFile());
+
+               bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+
+               final TestRecoverableWriter newRecoverableWriter = 
getRecoverableWriter(path);
+               restoreBucket(newRecoverableWriter, 0, 1, state);
+
+               assertThat(newRecoverableWriter, hasCalledDiscard(1)); // that 
is for checkpoints 0 and 1
+       }
+
+       @Test
+       public void shouldNotCallCleanupWithoutInProgressPartFiles() throws 
Exception {
+               final File outDir = TEMP_FOLDER.newFolder();
+               final Path path = new Path(outDir.toURI());
+
+               final TestRecoverableWriter recoverableWriter = 
getRecoverableWriter(path);
+               final Bucket<String, String> bucketUnderTest =
+                               createBucket(recoverableWriter, path, 0, 0);
+
+               final BucketState<String> state = 
bucketUnderTest.onReceptionOfCheckpoint(0L);
+               assertThat(state, hasNoActiveInProgressFile());
+
+               bucketUnderTest.onReceptionOfCheckpoint(1L);
+               bucketUnderTest.onReceptionOfCheckpoint(2L);
+
+               bucketUnderTest.onSuccessfulCompletionOfCheckpoint(2L);
+               assertThat(recoverableWriter, hasCalledDiscard(0)); // we have 
no in-progress file.
+       }
+
+       // ------------------------------- Matchers 
--------------------------------
+
+       private static TypeSafeMatcher<TestRecoverableWriter> 
hasCalledDiscard(int times) {
+               return new TypeSafeMatcher<TestRecoverableWriter>() {
+                       @Override
+                       protected boolean matchesSafely(TestRecoverableWriter 
writer) {
+                               return writer.getCleanupCallCounter() == times;
+                       }
+
+                       @Override
+                       public void describeTo(Description description) {
+                               description
+                                               .appendText("the 
TestRecoverableWriter to have called discardRecoverableState() ")
+                                               .appendValue(times)
+                                               .appendText(" times.");
+                       }
+               };
+       }
+
+       private static TypeSafeMatcher<BucketState<String>> 
hasActiveInProgressFile() {
+               return new TypeSafeMatcher<BucketState<String>>() {
+                       @Override
+                       protected boolean matchesSafely(BucketState<String> 
state) {
+                               return state.getInProgressResumableFile() != 
null;
+                       }
+
+                       @Override
+                       public void describeTo(Description description) {
+                               description.appendText("a BucketState with 
active in-progress file.");
+                       }
+               };
+       }
+
+       private static TypeSafeMatcher<BucketState<String>> 
hasNoActiveInProgressFile() {
+               return new TypeSafeMatcher<BucketState<String>>() {
+                       @Override
+                       protected boolean matchesSafely(BucketState<String> 
state) {
+                               return state.getInProgressResumableFile() == 
null;
+                       }
+
+                       @Override
+                       public void describeTo(Description description) {
+                               description.appendText("a BucketState with no 
active in-progress file.");
+                       }
+               };
+       }
+
+       // ------------------------------- Mock Classes 
--------------------------------
+
+       private static class TestRecoverableWriter extends 
LocalRecoverableWriter {
+
+               private int cleanupCallCounter = 0;
+
+               TestRecoverableWriter(LocalFileSystem fs) {
+                       super(fs);
+               }
+
+               int getCleanupCallCounter() {
+                       return cleanupCallCounter;
+               }
+
+               @Override
+               public boolean requiresCleanupOfRecoverableState() {
+                       // here we return true so that the 
cleanupRecoverableState() is called.
+                       return true;
+               }
+
+               @Override
+               public boolean cleanupRecoverableState(ResumeRecoverable 
resumable) throws IOException {
+                       cleanupCallCounter++;
+                       return false;
+               }
+
+               @Override
+               public String toString() {
+                       return "TestRecoverableWriter has called 
discardRecoverableState() " + cleanupCallCounter + " times.";
+               }
+       }
+
+       // ------------------------------- Utility Methods 
--------------------------------
+
+       private static final String bucketId = "testing-bucket";
+
+       private static final RollingPolicy<String, String> rollingPolicy = 
DefaultRollingPolicy.create().build();
+
+       private static final PartFileWriter.PartFileFactory<String, String> 
partFileFactory =
+                       new RowWisePartWriter.Factory<>(new 
SimpleStringEncoder<>());
+
+       private static Bucket<String, String> createBucket(
+                       final RecoverableWriter writer,
+                       final Path bucketPath,
+                       final int subtaskIdx,
+                       final int initialPartCounter) {
+
+               return Bucket.getNew(
+                               writer,
+                               subtaskIdx,
+                               bucketId,
+                               bucketPath,
+                               initialPartCounter,
+                               partFileFactory,
+                               rollingPolicy);
+       }
+
+       private static Bucket<String, String> restoreBucket(
+                       final RecoverableWriter writer,
+                       final int subtaskIndex,
+                       final long initialPartCounter,
+                       final BucketState<String> bucketState) throws Exception 
{
+
+               return Bucket.restore(
+                               writer,
+                               subtaskIndex,
+                               initialPartCounter,
+                               partFileFactory,
+                               rollingPolicy,
+                               bucketState
+               );
+       }
+
+       private static TestRecoverableWriter getRecoverableWriter(Path path) {
+               try {
+                       final FileSystem fs = FileSystem.get(path.toUri());
+                       if (!(fs instanceof LocalFileSystem)) {
+                               fail("Expected Local FS but got a " + 
fs.getClass().getName() + " for path: " + path);
+                       }
+                       return new TestRecoverableWriter((LocalFileSystem) fs);
+               } catch (IOException e) {
+                       fail();
+               }
+               return null;
+       }
+}

Reply via email to