This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.7 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5de507835b2b9a93376820b79a435b8efe53b8a6 Author: Kostas Kloudas <[email protected]> AuthorDate: Thu Nov 22 11:03:13 2018 +0100 [FLINK-10963][fs-connector, s3] Cleanup tmp S3 objects uploaded as backups of in-progress files. --- .../apache/flink/core/fs/RecoverableWriter.java | 28 +++ .../core/fs/local/LocalRecoverableWriter.java | 10 + .../runtime/fs/hdfs/HadoopRecoverableWriter.java | 10 + .../writer/RecoverableMultiPartUploadImpl.java | 5 - .../flink/fs/s3/common/writer/S3AccessHelper.java | 10 + .../S3RecoverableMultipartUploadFactory.java | 8 - .../fs/s3/common/writer/S3RecoverableWriter.java | 34 ++- .../writer/RecoverableMultiPartUploadImplTest.java | 5 + .../flink/fs/s3hadoop/HadoopS3AccessHelper.java | 15 +- .../s3hadoop/HadoopS3RecoverableWriterITCase.java | 46 ++++ .../api/functions/sink/filesystem/Bucket.java | 87 +++++-- .../api/functions/sink/filesystem/BucketTest.java | 263 +++++++++++++++++++++ 12 files changed, 473 insertions(+), 48 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java index e5bfdb8..7d54b11 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java @@ -122,6 +122,34 @@ public interface RecoverableWriter { RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException; /** + * Marks if the writer requires to do any additional cleanup/freeing of resources occupied + * as part of a {@link ResumeRecoverable}, e.g. temporarily files created or objects uploaded + * to external systems. + * + * <p>In case cleanup is required, then {@link #cleanupRecoverableState(ResumeRecoverable)} should + * be called. + * + * @return {@code true} if cleanup is required, {@code false} otherwise. + */ + boolean requiresCleanupOfRecoverableState(); + + /** + * Frees up any resources that were previously occupied in order to be able to + * recover from a (potential) failure. These can be temporary files that were written + * to the filesystem or objects that were uploaded to S3. + * + * <p><b>NOTE:</b> This operation should not throw an exception if the resumable has already + * been cleaned up and the resources have been freed. But the contract is that it will throw + * an {@link UnsupportedOperationException} if it is called for a {@code RecoverableWriter} + * whose {@link #requiresCleanupOfRecoverableState()} returns {@code false}. + * + * @param resumable The {@link ResumeRecoverable} whose state we want to clean-up. + * @return {@code true} if the resources were successfully freed, {@code false} otherwise + * (e.g. the file to be deleted was not there for any reason - already deleted or never created). + */ + boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException; + + /** * Recovers a recoverable stream consistently at the point indicated by the given CommitRecoverable * for finalizing and committing. This will publish the target file with exactly the data * that was written up to the point then the CommitRecoverable was created. diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java index a2f0485..a43e0b6 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java @@ -71,6 +71,16 @@ public class LocalRecoverableWriter implements RecoverableWriter { } @Override + public boolean requiresCleanupOfRecoverableState() { + return false; + } + + @Override + public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException { if (recoverable instanceof LocalRecoverable) { return new LocalRecoverableFsDataOutputStream.LocalCommitter((LocalRecoverable) recoverable); diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java index 305f8ee..03d741b 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java @@ -78,6 +78,16 @@ public class HadoopRecoverableWriter implements RecoverableWriter { } @Override + public boolean requiresCleanupOfRecoverableState() { + return false; + } + + @Override + public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException { if (recoverable instanceof HadoopFsRecoverable) { return new HadoopRecoverableFsDataOutputStream.HadoopFsCommitter(fs, (HadoopFsRecoverable) recoverable); diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java index 787f286..9d88e65 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java @@ -174,11 +174,6 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload final String incompletePartObjectName = createIncompletePartObjectName(); file.retain(); try (InputStream inputStream = file.getInputStream()) { - - // TODO: staged incomplete parts are not cleaned up as - // they do not fall under the user's global TTL on S3. - // Figure out a way to clean them. - s3AccessHelper.putObject(incompletePartObjectName, inputStream, file.getPos()); } finally { diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java index dbc099a..bcdea3c 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java @@ -93,6 +93,16 @@ public interface S3AccessHelper { CompleteMultipartUploadResult commitMultiPartUpload(String key, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException; /** + * Deletes the object associated with the provided key. + * + * @param key The key to be deleted. + * @return {@code true} if the resources were successfully freed, {@code false} otherwise + * (e.g. the file to be deleted was not there). + * @throws IOException + */ + boolean deleteObject(String key) throws IOException; + + /** * Gets the object associated with the provided {@code key} from S3 and * puts it in the provided {@code targetLocation}. * diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java index ddb09ab..3727e25 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java @@ -96,14 +96,6 @@ final class S3RecoverableMultipartUploadFactory { final File file = refCountedFile.getFile(); final long numBytes = s3AccessHelper.getObject(objectKey, file); - // some sanity checks - if (numBytes != file.length()) { - throw new IOException(String.format("Error recovering writer: " + - "Downloading the last data chunk file gives incorrect length. " + - "File=%d bytes, Stream=%d bytes", - file.length(), numBytes)); - } - if (numBytes != recoverable.incompleteObjectLength()) { throw new IOException(String.format("Error recovering writer: " + "Downloading the last data chunk file gives incorrect length." + diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java index 698f65f..ddb4443 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java @@ -26,7 +26,6 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.fs.s3.common.utils.RefCountedFile; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.FunctionWithException; import org.apache.hadoop.fs.FileSystem; @@ -37,6 +36,7 @@ import java.util.concurrent.Executor; import static org.apache.flink.fs.s3.common.FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * An implementation of the {@link RecoverableWriter} against S3. @@ -54,16 +54,20 @@ public class S3RecoverableWriter implements RecoverableWriter { private final long userDefinedMinPartSize; + private final S3AccessHelper s3AccessHelper; + private final S3RecoverableMultipartUploadFactory uploadFactory; @VisibleForTesting S3RecoverableWriter( + final S3AccessHelper s3AccessHelper, final S3RecoverableMultipartUploadFactory uploadFactory, final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator, final long userDefinedMinPartSize) { - this.uploadFactory = Preconditions.checkNotNull(uploadFactory); - this.tempFileCreator = Preconditions.checkNotNull(tempFileCreator); + this.s3AccessHelper = checkNotNull(s3AccessHelper); + this.uploadFactory = checkNotNull(uploadFactory); + this.tempFileCreator = checkNotNull(tempFileCreator); this.userDefinedMinPartSize = userDefinedMinPartSize; } @@ -78,14 +82,14 @@ public class S3RecoverableWriter implements RecoverableWriter { } @Override - public Committer recoverForCommit(RecoverableWriter.CommitRecoverable recoverable) throws IOException { + public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException { final S3Recoverable s3recoverable = castToS3Recoverable(recoverable); final S3RecoverableFsDataOutputStream recovered = recover(s3recoverable); return recovered.closeForCommit(); } @Override - public S3RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable recoverable) throws IOException { + public S3RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException { final S3Recoverable s3recoverable = castToS3Recoverable(recoverable); final RecoverableMultiPartUpload upload = uploadFactory.recoverRecoverableUpload(s3recoverable); @@ -98,14 +102,26 @@ public class S3RecoverableWriter implements RecoverableWriter { } @Override + public boolean requiresCleanupOfRecoverableState() { + return true; + } + + @Override + public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException { + final S3Recoverable s3recoverable = castToS3Recoverable(resumable); + final String smallPartObjectToDelete = s3recoverable.incompleteObjectName(); + return smallPartObjectToDelete != null && s3AccessHelper.deleteObject(smallPartObjectToDelete); + } + + @Override @SuppressWarnings({"rawtypes", "unchecked"}) - public SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitRecoverableSerializer() { + public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() { return (SimpleVersionedSerializer) S3RecoverableSerializer.INSTANCE; } @Override @SuppressWarnings({"rawtypes", "unchecked"}) - public SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeRecoverableSerializer() { + public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() { return (SimpleVersionedSerializer) S3RecoverableSerializer.INSTANCE; } @@ -116,7 +132,7 @@ public class S3RecoverableWriter implements RecoverableWriter { // --------------------------- Utils --------------------------- - private static S3Recoverable castToS3Recoverable(RecoverableWriter.CommitRecoverable recoverable) { + private static S3Recoverable castToS3Recoverable(CommitRecoverable recoverable) { if (recoverable instanceof S3Recoverable) { return (S3Recoverable) recoverable; } @@ -144,6 +160,6 @@ public class S3RecoverableWriter implements RecoverableWriter { uploadThreadPool, tempFileCreator); - return new S3RecoverableWriter(uploadFactory, tempFileCreator, userDefinedMinPartSize); + return new S3RecoverableWriter(s3AccessHelper, uploadFactory, tempFileCreator, userDefinedMinPartSize); } } diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java index a986111..673796d 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java @@ -373,6 +373,11 @@ public class RecoverableMultiPartUploadImplTest { } @Override + public boolean deleteObject(String key) throws IOException { + return false; + } + + @Override public long getObject(String key, File targetLocation) throws IOException { return 0; } diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java index 473439c..b9612ad 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java @@ -86,6 +86,11 @@ public class HadoopS3AccessHelper implements S3AccessHelper { } @Override + public boolean deleteObject(String key) throws IOException { + return s3a.delete(new org.apache.hadoop.fs.Path('/' + key), false); + } + + @Override public long getObject(String key, File targetLocation) throws IOException { long numBytes = 0L; try ( @@ -96,12 +101,20 @@ public class HadoopS3AccessHelper implements S3AccessHelper { final byte[] buffer = new byte[32 * 1024]; int numRead; - while ((numRead = inStream.read(buffer)) > 0) { + while ((numRead = inStream.read(buffer)) != -1) { outStream.write(buffer, 0, numRead); numBytes += numRead; } } + // some sanity checks + if (numBytes != targetLocation.length()) { + throw new IOException(String.format("Error recovering writer: " + + "Downloading the last data chunk file gives incorrect length. " + + "File=%d bytes, Stream=%d bytes", + targetLocation.length(), numBytes)); + } + return numBytes; } diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java index 17fb02b..6c8619d 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.fs.s3.common.FlinkS3FileSystem; +import org.apache.flink.fs.s3.common.writer.S3Recoverable; import org.apache.flink.testutils.s3.S3TestCredentials; import org.apache.flink.util.MathUtils; import org.apache.flink.util.StringUtils; @@ -42,6 +43,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.BufferedReader; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; @@ -213,6 +215,50 @@ public class HadoopS3RecoverableWriterITCase extends TestLogger { Assert.assertEquals(testData1 + testData2, getContentsOfFile(path)); } + @Test(expected = FileNotFoundException.class) + public void testCleanupRecoverableState() throws Exception { + final RecoverableWriter writer = getRecoverableWriter(); + final Path path = new Path(basePathForTest, "part-0"); + + final RecoverableFsDataOutputStream stream = writer.open(path); + stream.write(bytesOf(testData1)); + S3Recoverable recoverable = (S3Recoverable) stream.persist(); + + stream.closeForCommit().commit(); + + // still the data is there as we have not deleted them from the tmp object + final String content = getContentsOfFile(new Path('/' + recoverable.incompleteObjectName())); + Assert.assertEquals(testData1, content); + + boolean successfullyDeletedState = writer.cleanupRecoverableState(recoverable); + Assert.assertTrue(successfullyDeletedState); + + // this should throw the exception as we deleted the file. + getContentsOfFile(new Path('/' + recoverable.incompleteObjectName())); + } + + @Test + public void testCallingDeleteObjectTwiceDoesNotThroughException() throws Exception { + final RecoverableWriter writer = getRecoverableWriter(); + final Path path = new Path(basePathForTest, "part-0"); + + final RecoverableFsDataOutputStream stream = writer.open(path); + stream.write(bytesOf(testData1)); + S3Recoverable recoverable = (S3Recoverable) stream.persist(); + + stream.closeForCommit().commit(); + + // still the data is there as we have not deleted them from the tmp object + final String content = getContentsOfFile(new Path('/' + recoverable.incompleteObjectName())); + Assert.assertEquals(testData1, content); + + boolean successfullyDeletedState = writer.cleanupRecoverableState(recoverable); + Assert.assertTrue(successfullyDeletedState); + + boolean unsuccessfulDeletion = writer.cleanupRecoverableState(recoverable); + Assert.assertFalse(unsuccessfulDeletion); + } + // ----------------------- Test Recovery ----------------------- @Test diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java index 8ba35b8..b59c84e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java @@ -23,6 +23,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable; +import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,11 +33,12 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Objects; +import java.util.TreeMap; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -65,14 +68,16 @@ public class Bucket<IN, BucketID> { private final RollingPolicy<IN, BucketID> rollingPolicy; - private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPartsPerCheckpoint; + private final NavigableMap<Long, ResumeRecoverable> resumablesPerCheckpoint; + + private final NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint; private long partCounter; @Nullable private PartFileWriter<IN, BucketID> inProgressPart; - private List<RecoverableWriter.CommitRecoverable> pendingPartsForCurrentCheckpoint; + private List<CommitRecoverable> pendingPartsForCurrentCheckpoint; /** * Constructor to create a new empty bucket. @@ -95,7 +100,8 @@ public class Bucket<IN, BucketID> { this.rollingPolicy = checkNotNull(rollingPolicy); this.pendingPartsForCurrentCheckpoint = new ArrayList<>(); - this.pendingPartsPerCheckpoint = new HashMap<>(); + this.pendingPartsPerCheckpoint = new TreeMap<>(); + this.resumablesPerCheckpoint = new TreeMap<>(); } /** @@ -123,21 +129,26 @@ public class Bucket<IN, BucketID> { } private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException { + if (!state.hasInProgressResumableFile()) { + return; + } // we try to resume the previous in-progress file - if (state.hasInProgressResumableFile()) { - final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile(); - final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable); - inProgressPart = partFileFactory.resumeFrom( - bucketId, stream, resumable, state.getInProgressFileCreationTime()); + final ResumeRecoverable resumable = state.getInProgressResumableFile(); + final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable); + inProgressPart = partFileFactory.resumeFrom( + bucketId, stream, resumable, state.getInProgressFileCreationTime()); + + if (fsWriter.requiresCleanupOfRecoverableState()) { + fsWriter.cleanupRecoverableState(resumable); } } private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException { // we commit pending files for checkpoints that precess the last successful one, from which we are recovering - for (List<RecoverableWriter.CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) { - for (RecoverableWriter.CommitRecoverable committable: committables) { + for (List<CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) { + for (CommitRecoverable committable: committables) { fsWriter.recoverForCommit(committable).commitAfterRecovery(); } } @@ -172,7 +183,7 @@ public class Bucket<IN, BucketID> { checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty()); checkState(bucket.pendingPartsPerCheckpoint.isEmpty()); - RecoverableWriter.CommitRecoverable committable = bucket.closePartFile(); + CommitRecoverable committable = bucket.closePartFile(); if (committable != null) { pendingPartsForCurrentCheckpoint.add(committable); } @@ -214,8 +225,8 @@ public class Bucket<IN, BucketID> { return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter); } - private RecoverableWriter.CommitRecoverable closePartFile() throws IOException { - RecoverableWriter.CommitRecoverable committable = null; + private CommitRecoverable closePartFile() throws IOException { + CommitRecoverable committable = null; if (inProgressPart != null) { committable = inProgressPart.closeForCommit(); pendingPartsForCurrentCheckpoint.add(committable); @@ -233,12 +244,21 @@ public class Bucket<IN, BucketID> { BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException { prepareBucketForCheckpointing(checkpointId); - RecoverableWriter.ResumeRecoverable inProgressResumable = null; + ResumeRecoverable inProgressResumable = null; long inProgressFileCreationTime = Long.MAX_VALUE; if (inProgressPart != null) { inProgressResumable = inProgressPart.persist(); inProgressFileCreationTime = inProgressPart.getCreationTime(); + + // the following is an optimization so that writers that do not + // require cleanup, they do not have to keep track of resumables + // and later iterate over the active buckets. + // (see onSuccessfulCompletionOfCheckpoint()) + + if (fsWriter.requiresCleanupOfRecoverableState()) { + this.resumablesPerCheckpoint.put(checkpointId, inProgressResumable); + } } return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint); @@ -261,17 +281,34 @@ public class Bucket<IN, BucketID> { void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException { checkNotNull(fsWriter); - Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it = - pendingPartsPerCheckpoint.entrySet().iterator(); + Iterator<Map.Entry<Long, List<CommitRecoverable>>> it = + pendingPartsPerCheckpoint.headMap(checkpointId, true) + .entrySet().iterator(); + + while (it.hasNext()) { + Map.Entry<Long, List<CommitRecoverable>> entry = it.next(); + + for (CommitRecoverable committable : entry.getValue()) { + fsWriter.recoverForCommit(committable).commit(); + } + it.remove(); + } + + cleanupOutdatedResumables(checkpointId); + } + + private void cleanupOutdatedResumables(long checkpointId) throws IOException { + Iterator<Map.Entry<Long, ResumeRecoverable>> it = + resumablesPerCheckpoint.headMap(checkpointId, false) + .entrySet().iterator(); while (it.hasNext()) { - Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry = it.next(); + final ResumeRecoverable recoverable = it.next().getValue(); + final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable); + it.remove(); - if (entry.getKey() <= checkpointId) { - for (RecoverableWriter.CommitRecoverable committable : entry.getValue()) { - fsWriter.recoverForCommit(committable).commit(); - } - it.remove(); + if (LOG.isDebugEnabled() && successfullyDeleted) { + LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId); } } } @@ -290,7 +327,7 @@ public class Bucket<IN, BucketID> { // --------------------------- Testing Methods ----------------------------- @VisibleForTesting - Map<Long, List<RecoverableWriter.CommitRecoverable>> getPendingPartsPerCheckpoint() { + Map<Long, List<CommitRecoverable>> getPendingPartsPerCheckpoint() { return pendingPartsPerCheckpoint; } @@ -301,7 +338,7 @@ public class Bucket<IN, BucketID> { } @VisibleForTesting - List<RecoverableWriter.CommitRecoverable> getPendingPartsForCurrentCheckpoint() { + List<CommitRecoverable> getPendingPartsForCurrentCheckpoint() { return pendingPartsForCurrentCheckpoint; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java new file mode 100644 index 0000000..f328fd7 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.core.fs.local.LocalFileSystem; +import org.apache.flink.core.fs.local.LocalRecoverableWriter; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; + +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link Bucket}. + */ +public class BucketTest { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @Test + public void shouldNotCleanupResumablesThatArePartOfTheAckedCheckpoint() throws IOException { + final File outDir = TEMP_FOLDER.newFolder(); + final Path path = new Path(outDir.toURI()); + + final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path); + final Bucket<String, String> bucketUnderTest = + createBucket(recoverableWriter, path, 0, 0); + + bucketUnderTest.write("test-element", 0L); + + final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L); + assertThat(state, hasActiveInProgressFile()); + + bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L); + assertThat(recoverableWriter, hasCalledDiscard(0)); // it did not discard as this is still valid. + } + + @Test + public void shouldCleanupOutdatedResumablesOnCheckpointAck() throws IOException { + final File outDir = TEMP_FOLDER.newFolder(); + final Path path = new Path(outDir.toURI()); + + final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path); + final Bucket<String, String> bucketUnderTest = + createBucket(recoverableWriter, path, 0, 0); + + bucketUnderTest.write("test-element", 0L); + + final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L); + assertThat(state, hasActiveInProgressFile()); + + bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L); + + bucketUnderTest.onReceptionOfCheckpoint(1L); + bucketUnderTest.onReceptionOfCheckpoint(2L); + + bucketUnderTest.onSuccessfulCompletionOfCheckpoint(2L); + assertThat(recoverableWriter, hasCalledDiscard(2)); // that is for checkpoints 0 and 1 + } + + @Test + public void shouldCleanupResumableAfterRestoring() throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + final Path path = new Path(outDir.toURI()); + + final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path); + final Bucket<String, String> bucketUnderTest = + createBucket(recoverableWriter, path, 0, 0); + + bucketUnderTest.write("test-element", 0L); + + final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L); + assertThat(state, hasActiveInProgressFile()); + + bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L); + + final TestRecoverableWriter newRecoverableWriter = getRecoverableWriter(path); + restoreBucket(newRecoverableWriter, 0, 1, state); + + assertThat(newRecoverableWriter, hasCalledDiscard(1)); // that is for checkpoints 0 and 1 + } + + @Test + public void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + final Path path = new Path(outDir.toURI()); + + final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path); + final Bucket<String, String> bucketUnderTest = + createBucket(recoverableWriter, path, 0, 0); + + final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L); + assertThat(state, hasNoActiveInProgressFile()); + + bucketUnderTest.onReceptionOfCheckpoint(1L); + bucketUnderTest.onReceptionOfCheckpoint(2L); + + bucketUnderTest.onSuccessfulCompletionOfCheckpoint(2L); + assertThat(recoverableWriter, hasCalledDiscard(0)); // we have no in-progress file. + } + + // ------------------------------- Matchers -------------------------------- + + private static TypeSafeMatcher<TestRecoverableWriter> hasCalledDiscard(int times) { + return new TypeSafeMatcher<TestRecoverableWriter>() { + @Override + protected boolean matchesSafely(TestRecoverableWriter writer) { + return writer.getCleanupCallCounter() == times; + } + + @Override + public void describeTo(Description description) { + description + .appendText("the TestRecoverableWriter to have called discardRecoverableState() ") + .appendValue(times) + .appendText(" times."); + } + }; + } + + private static TypeSafeMatcher<BucketState<String>> hasActiveInProgressFile() { + return new TypeSafeMatcher<BucketState<String>>() { + @Override + protected boolean matchesSafely(BucketState<String> state) { + return state.getInProgressResumableFile() != null; + } + + @Override + public void describeTo(Description description) { + description.appendText("a BucketState with active in-progress file."); + } + }; + } + + private static TypeSafeMatcher<BucketState<String>> hasNoActiveInProgressFile() { + return new TypeSafeMatcher<BucketState<String>>() { + @Override + protected boolean matchesSafely(BucketState<String> state) { + return state.getInProgressResumableFile() == null; + } + + @Override + public void describeTo(Description description) { + description.appendText("a BucketState with no active in-progress file."); + } + }; + } + + // ------------------------------- Mock Classes -------------------------------- + + private static class TestRecoverableWriter extends LocalRecoverableWriter { + + private int cleanupCallCounter = 0; + + TestRecoverableWriter(LocalFileSystem fs) { + super(fs); + } + + int getCleanupCallCounter() { + return cleanupCallCounter; + } + + @Override + public boolean requiresCleanupOfRecoverableState() { + // here we return true so that the cleanupRecoverableState() is called. + return true; + } + + @Override + public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException { + cleanupCallCounter++; + return false; + } + + @Override + public String toString() { + return "TestRecoverableWriter has called discardRecoverableState() " + cleanupCallCounter + " times."; + } + } + + // ------------------------------- Utility Methods -------------------------------- + + private static final String bucketId = "testing-bucket"; + + private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create().build(); + + private static final PartFileWriter.PartFileFactory<String, String> partFileFactory = + new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()); + + private static Bucket<String, String> createBucket( + final RecoverableWriter writer, + final Path bucketPath, + final int subtaskIdx, + final int initialPartCounter) { + + return Bucket.getNew( + writer, + subtaskIdx, + bucketId, + bucketPath, + initialPartCounter, + partFileFactory, + rollingPolicy); + } + + private static Bucket<String, String> restoreBucket( + final RecoverableWriter writer, + final int subtaskIndex, + final long initialPartCounter, + final BucketState<String> bucketState) throws Exception { + + return Bucket.restore( + writer, + subtaskIndex, + initialPartCounter, + partFileFactory, + rollingPolicy, + bucketState + ); + } + + private static TestRecoverableWriter getRecoverableWriter(Path path) { + try { + final FileSystem fs = FileSystem.get(path.toUri()); + if (!(fs instanceof LocalFileSystem)) { + fail("Expected Local FS but got a " + fs.getClass().getName() + " for path: " + path); + } + return new TestRecoverableWriter((LocalFileSystem) fs); + } catch (IOException e) { + fail(); + } + return null; + } +}
