This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0c1328aa4548953ff358da10052cc3644ec5c2f2 Author: Kostas Kloudas <[email protected]> AuthorDate: Wed Nov 21 10:35:58 2018 +0100 [hotfix][fs-connector] Refactor PartFileWriter to take stream. --- .../flink/streaming/api/functions/sink/filesystem/Bucket.java | 7 +++++-- .../api/functions/sink/filesystem/BulkPartWriter.java | 10 ++++------ .../api/functions/sink/filesystem/PartFileWriter.java | 8 ++++---- .../api/functions/sink/filesystem/RowWisePartWriter.java | 10 ++++------ 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java index 65a7628..042bcda 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.util.Preconditions; @@ -124,8 +125,9 @@ public class Bucket<IN, BucketID> { // we try to resume the previous in-progress file if (state.hasInProgressResumableFile()) { final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile(); + final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable); inProgressPart = partFileFactory.resumeFrom( - bucketId, fsWriter, resumable, state.getInProgressFileCreationTime()); + bucketId, stream, resumable, state.getInProgressFileCreationTime()); } } @@ -195,7 +197,8 @@ public class Bucket<IN, BucketID> { closePartFile(); final Path partFilePath = assembleNewPartPath(); - inProgressPart = partFileFactory.openNew(bucketId, fsWriter, partFilePath, currentTime); + final RecoverableFsDataOutputStream stream = fsWriter.open(partFilePath); + inProgressPart = partFileFactory.openNew(bucketId, stream, partFilePath, currentTime); if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.", diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java index 005ae4e..a44b0e8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java @@ -79,14 +79,13 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> { @Override public PartFileWriter<IN, BucketID> resumeFrom( final BucketID bucketId, - final RecoverableWriter fileSystemWriter, + final RecoverableFsDataOutputStream stream, final RecoverableWriter.ResumeRecoverable resumable, final long creationTime) throws IOException { - Preconditions.checkNotNull(fileSystemWriter); + Preconditions.checkNotNull(stream); Preconditions.checkNotNull(resumable); - final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable); final BulkWriter<IN> writer = writerFactory.create(stream); return new BulkPartWriter<>(bucketId, stream, writer, creationTime); } @@ -94,14 +93,13 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> { @Override public PartFileWriter<IN, BucketID> openNew( final BucketID bucketId, - final RecoverableWriter fileSystemWriter, + final RecoverableFsDataOutputStream stream, final Path path, final long creationTime) throws IOException { - Preconditions.checkNotNull(fileSystemWriter); + Preconditions.checkNotNull(stream); Preconditions.checkNotNull(path); - final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path); final BulkWriter<IN> writer = writerFactory.create(stream); return new BulkPartWriter<>(bucketId, stream, writer, creationTime); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java index 662454b..95a2978a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java @@ -111,7 +111,7 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> { /** * Used upon recovery from a failure to recover a {@link PartFileWriter writer}. * @param bucketId the id of the bucket this writer is writing to. - * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem. + * @param stream the filesystem-specific output stream to use when writing to the filesystem. * @param resumable the state of the stream we are resurrecting. * @param creationTime the creation time of the stream. * @return the recovered {@link PartFileWriter writer}. @@ -119,14 +119,14 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> { */ PartFileWriter<IN, BucketID> resumeFrom( final BucketID bucketId, - final RecoverableWriter fileSystemWriter, + final RecoverableFsDataOutputStream stream, final RecoverableWriter.ResumeRecoverable resumable, final long creationTime) throws IOException; /** * Used to create a new {@link PartFileWriter writer}. * @param bucketId the id of the bucket this writer is writing to. - * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem. + * @param stream the filesystem-specific output stream to use when writing to the filesystem. * @param path the part this writer will write to. * @param creationTime the creation time of the stream. * @return the new {@link PartFileWriter writer}. @@ -134,7 +134,7 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> { */ PartFileWriter<IN, BucketID> openNew( final BucketID bucketId, - final RecoverableWriter fileSystemWriter, + final RecoverableFsDataOutputStream stream, final Path path, final long creationTime) throws IOException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java index 2478b79..05c160c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java @@ -67,28 +67,26 @@ final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> @Override public PartFileWriter<IN, BucketID> resumeFrom( final BucketID bucketId, - final RecoverableWriter fileSystemWriter, + final RecoverableFsDataOutputStream stream, final RecoverableWriter.ResumeRecoverable resumable, final long creationTime) throws IOException { - Preconditions.checkNotNull(fileSystemWriter); + Preconditions.checkNotNull(stream); Preconditions.checkNotNull(resumable); - final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable); return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime); } @Override public PartFileWriter<IN, BucketID> openNew( final BucketID bucketId, - final RecoverableWriter fileSystemWriter, + final RecoverableFsDataOutputStream stream, final Path path, final long creationTime) throws IOException { - Preconditions.checkNotNull(fileSystemWriter); + Preconditions.checkNotNull(stream); Preconditions.checkNotNull(path); - final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path); return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime); } }
