This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c1328aa4548953ff358da10052cc3644ec5c2f2
Author: Kostas Kloudas <[email protected]>
AuthorDate: Wed Nov 21 10:35:58 2018 +0100

    [hotfix][fs-connector] Refactor PartFileWriter to take stream.
---
 .../flink/streaming/api/functions/sink/filesystem/Bucket.java  |  7 +++++--
 .../api/functions/sink/filesystem/BulkPartWriter.java          | 10 ++++------
 .../api/functions/sink/filesystem/PartFileWriter.java          |  8 ++++----
 .../api/functions/sink/filesystem/RowWisePartWriter.java       | 10 ++++------
 4 files changed, 17 insertions(+), 18 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 65a7628..042bcda 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -21,6 +21,7 @@ package 
org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
 
@@ -124,8 +125,9 @@ public class Bucket<IN, BucketID> {
                // we try to resume the previous in-progress file
                if (state.hasInProgressResumableFile()) {
                        final RecoverableWriter.ResumeRecoverable resumable = 
state.getInProgressResumableFile();
+                       final RecoverableFsDataOutputStream stream = 
fsWriter.recover(resumable);
                        inProgressPart = partFileFactory.resumeFrom(
-                                       bucketId, fsWriter, resumable, 
state.getInProgressFileCreationTime());
+                                       bucketId, stream, resumable, 
state.getInProgressFileCreationTime());
                }
        }
 
@@ -195,7 +197,8 @@ public class Bucket<IN, BucketID> {
                closePartFile();
 
                final Path partFilePath = assembleNewPartPath();
-               inProgressPart = partFileFactory.openNew(bucketId, fsWriter, 
partFilePath, currentTime);
+               final RecoverableFsDataOutputStream stream = 
fsWriter.open(partFilePath);
+               inProgressPart = partFileFactory.openNew(bucketId, stream, 
partFilePath, currentTime);
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Subtask {} opening new part file \"{}\" for 
bucket id={}.",
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index 005ae4e..a44b0e8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -79,14 +79,13 @@ final class BulkPartWriter<IN, BucketID> extends 
PartFileWriter<IN, BucketID> {
                @Override
                public PartFileWriter<IN, BucketID> resumeFrom(
                                final BucketID bucketId,
-                               final RecoverableWriter fileSystemWriter,
+                               final RecoverableFsDataOutputStream stream,
                                final RecoverableWriter.ResumeRecoverable 
resumable,
                                final long creationTime) throws IOException {
 
-                       Preconditions.checkNotNull(fileSystemWriter);
+                       Preconditions.checkNotNull(stream);
                        Preconditions.checkNotNull(resumable);
 
-                       final RecoverableFsDataOutputStream stream = 
fileSystemWriter.recover(resumable);
                        final BulkWriter<IN> writer = 
writerFactory.create(stream);
                        return new BulkPartWriter<>(bucketId, stream, writer, 
creationTime);
                }
@@ -94,14 +93,13 @@ final class BulkPartWriter<IN, BucketID> extends 
PartFileWriter<IN, BucketID> {
                @Override
                public PartFileWriter<IN, BucketID> openNew(
                                final BucketID bucketId,
-                               final RecoverableWriter fileSystemWriter,
+                               final RecoverableFsDataOutputStream stream,
                                final Path path,
                                final long creationTime) throws IOException {
 
-                       Preconditions.checkNotNull(fileSystemWriter);
+                       Preconditions.checkNotNull(stream);
                        Preconditions.checkNotNull(path);
 
-                       final RecoverableFsDataOutputStream stream = 
fileSystemWriter.open(path);
                        final BulkWriter<IN> writer = 
writerFactory.create(stream);
                        return new BulkPartWriter<>(bucketId, stream, writer, 
creationTime);
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
index 662454b..95a2978a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
@@ -111,7 +111,7 @@ abstract class PartFileWriter<IN, BucketID> implements 
PartFileInfo<BucketID> {
                /**
                 * Used upon recovery from a failure to recover a {@link 
PartFileWriter writer}.
                 * @param bucketId the id of the bucket this writer is writing 
to.
-                * @param fileSystemWriter the filesystem-specific writer to 
use when writing to the filesystem.
+                * @param stream the filesystem-specific output stream to use 
when writing to the filesystem.
                 * @param resumable the state of the stream we are resurrecting.
                 * @param creationTime the creation time of the stream.
                 * @return the recovered {@link PartFileWriter writer}.
@@ -119,14 +119,14 @@ abstract class PartFileWriter<IN, BucketID> implements 
PartFileInfo<BucketID> {
                 */
                PartFileWriter<IN, BucketID> resumeFrom(
                        final BucketID bucketId,
-                       final RecoverableWriter fileSystemWriter,
+                       final RecoverableFsDataOutputStream stream,
                        final RecoverableWriter.ResumeRecoverable resumable,
                        final long creationTime) throws IOException;
 
                /**
                 * Used to create a new {@link PartFileWriter writer}.
                 * @param bucketId the id of the bucket this writer is writing 
to.
-                * @param fileSystemWriter the filesystem-specific writer to 
use when writing to the filesystem.
+                * @param stream the filesystem-specific output stream to use 
when writing to the filesystem.
                 * @param path the part this writer will write to.
                 * @param creationTime the creation time of the stream.
                 * @return the new {@link PartFileWriter writer}.
@@ -134,7 +134,7 @@ abstract class PartFileWriter<IN, BucketID> implements 
PartFileInfo<BucketID> {
                 */
                PartFileWriter<IN, BucketID> openNew(
                        final BucketID bucketId,
-                       final RecoverableWriter fileSystemWriter,
+                       final RecoverableFsDataOutputStream stream,
                        final Path path,
                        final long creationTime) throws IOException;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index 2478b79..05c160c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -67,28 +67,26 @@ final class RowWisePartWriter<IN, BucketID> extends 
PartFileWriter<IN, BucketID>
                @Override
                public PartFileWriter<IN, BucketID> resumeFrom(
                                final BucketID bucketId,
-                               final RecoverableWriter fileSystemWriter,
+                               final RecoverableFsDataOutputStream stream,
                                final RecoverableWriter.ResumeRecoverable 
resumable,
                                final long creationTime) throws IOException {
 
-                       Preconditions.checkNotNull(fileSystemWriter);
+                       Preconditions.checkNotNull(stream);
                        Preconditions.checkNotNull(resumable);
 
-                       final RecoverableFsDataOutputStream stream = 
fileSystemWriter.recover(resumable);
                        return new RowWisePartWriter<>(bucketId, stream, 
encoder, creationTime);
                }
 
                @Override
                public PartFileWriter<IN, BucketID> openNew(
                                final BucketID bucketId,
-                               final RecoverableWriter fileSystemWriter,
+                               final RecoverableFsDataOutputStream stream,
                                final Path path,
                                final long creationTime) throws IOException {
 
-                       Preconditions.checkNotNull(fileSystemWriter);
+                       Preconditions.checkNotNull(stream);
                        Preconditions.checkNotNull(path);
 
-                       final RecoverableFsDataOutputStream stream = 
fileSystemWriter.open(path);
                        return new RowWisePartWriter<>(bucketId, stream, 
encoder, creationTime);
                }
        }

Reply via email to