This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d0757e95c3c33d4f13377f2c14c179076b3795b2
Author: Yun Gao <[email protected]>
AuthorDate: Wed Nov 25 23:29:38 2020 +0800

    [FLINK-20337] Let FileSink restore state from StreamingFileSink
    
    The FileWriterBucketStateSerializer is bumped past version 2 to allow it
    to restore the same old versions as BucketStateSerialize (from
    StreamingFileSink).
---
 .../apache/flink/connector/file/sink/FileSink.java |  14 +--
 .../file/sink/writer/FileWriterBucket.java         |  12 +++
 .../file/sink/writer/FileWriterBucketState.java    |  33 +++++++
 .../writer/FileWriterBucketStateSerializer.java    | 110 ++++++++++++++++++---
 .../FileWriterBucketStateSerializerTest.java       |   4 +-
 .../file/sink/writer/FileWriterBucketTest.java     |  59 +++++++++++
 .../OutputStreamBasedPartFileWriter.java           |  23 +++--
 7 files changed, 229 insertions(+), 26 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
index 13d36d3..ccd010d 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
@@ -293,10 +293,11 @@ public class FileSink<IN>
 
                @Override
                SimpleVersionedSerializer<FileWriterBucketState> 
getWriterStateSerializer() throws IOException {
+                       BucketWriter<IN, String> bucketWriter = 
createBucketWriter();
+
                        return new FileWriterBucketStateSerializer(
-                                       createBucketWriter()
-                                                       .getProperties()
-                                                       
.getInProgressFileRecoverableSerializer());
+                                       
bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+                                       
bucketWriter.getProperties().getPendingFileRecoverableSerializer());
                }
 
                @Override
@@ -444,10 +445,11 @@ public class FileSink<IN>
 
                @Override
                SimpleVersionedSerializer<FileWriterBucketState> 
getWriterStateSerializer() throws IOException {
+                       BucketWriter<IN, String> bucketWriter = 
createBucketWriter();
+
                        return new FileWriterBucketStateSerializer(
-                                       createBucketWriter()
-                                                       .getProperties()
-                                                       
.getInProgressFileRecoverableSerializer());
+                                       
bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+                                       
bucketWriter.getProperties().getPendingFileRecoverableSerializer());
                }
 
                @Override
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
index 33a4469..5cb135a 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
@@ -118,6 +118,10 @@ class FileWriterBucket<IN> {
                                outputFileConfig);
 
                restoreInProgressFile(bucketState);
+
+               // Restore pending files, this only make difference if we are
+               // migrating from {@code StreamingFileSink}.
+               cacheRecoveredPendingFiles(bucketState);
        }
 
        private void restoreInProgressFile(FileWriterBucketState state) throws 
IOException {
@@ -137,6 +141,14 @@ class FileWriterBucket<IN> {
                }
        }
 
+       private void cacheRecoveredPendingFiles(FileWriterBucketState state) {
+               // Cache the previous pending files and send to committer on 
the first prepareCommit operation.
+               for (List<InProgressFileWriter.PendingFileRecoverable> 
restoredPendingRecoverables :
+                               
state.getPendingFileRecoverablesPerCheckpoint().values()) {
+                       pendingFiles.addAll(restoredPendingRecoverables);
+               }
+       }
+
        public String getBucketId() {
                return bucketId;
        }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketState.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketState.java
index 72f88c1..cbab95e 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketState.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketState.java
@@ -23,7 +23,12 @@ import org.apache.flink.core.fs.Path;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import static 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import static 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
 
 /**
  * States for {@link FileWriterBucket}.
@@ -49,15 +54,31 @@ public class FileWriterBucketState {
        @Nullable
        private final InProgressFileRecoverable inProgressFileRecoverable;
 
+       /**
+        * The {@link PendingFileRecoverable} should be empty unless we
+        * are migrating from {@code StreamingFileSink}.
+        */
+       private final Map<Long, List<PendingFileRecoverable>> 
pendingFileRecoverablesPerCheckpoint;
+
        public FileWriterBucketState(
                        String bucketId,
                        Path bucketPath,
                        long inProgressFileCreationTime,
                        @Nullable InProgressFileRecoverable 
inProgressFileRecoverable) {
+               this(bucketId, bucketPath, inProgressFileCreationTime, 
inProgressFileRecoverable, new HashMap<>());
+       }
+
+       public FileWriterBucketState(
+                       String bucketId,
+                       Path bucketPath,
+                       long inProgressFileCreationTime,
+                       @Nullable InProgressFileRecoverable 
inProgressFileRecoverable,
+                       Map<Long, List<PendingFileRecoverable>> 
pendingFileRecoverablesPerCheckpoint) {
                this.bucketId = bucketId;
                this.bucketPath = bucketPath;
                this.inProgressFileCreationTime = inProgressFileCreationTime;
                this.inProgressFileRecoverable = inProgressFileRecoverable;
+               this.pendingFileRecoverablesPerCheckpoint = 
pendingFileRecoverablesPerCheckpoint;
        }
 
        public String getBucketId() {
@@ -81,6 +102,10 @@ public class FileWriterBucketState {
                return inProgressFileRecoverable != null;
        }
 
+       public Map<Long, List<PendingFileRecoverable>> 
getPendingFileRecoverablesPerCheckpoint() {
+               return pendingFileRecoverablesPerCheckpoint;
+       }
+
        @Override
        public String toString() {
                final StringBuilder strBuilder = new StringBuilder();
@@ -92,6 +117,14 @@ public class FileWriterBucketState {
                if (hasInProgressFileRecoverable()) {
                        strBuilder.append(", has open part file created @ 
").append(inProgressFileCreationTime);
                }
+
+               if (!pendingFileRecoverablesPerCheckpoint.isEmpty()) {
+                       strBuilder.append(", has pending files for checkpoints: 
{");
+                       for (long checkpointId: 
pendingFileRecoverablesPerCheckpoint.keySet()) {
+                               strBuilder.append(checkpointId).append(' ');
+                       }
+                       strBuilder.append('}');
+               }
                return strBuilder.toString();
        }
 }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializer.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializer.java
index a40cddb9..659ee0d 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializer.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializer.java
@@ -20,17 +20,28 @@ package org.apache.flink.connector.file.sink.writer;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.DataOutputView;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
+import org.apache.flink.util.function.BiFunctionWithException;
+import org.apache.flink.util.function.FunctionWithException;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 
 import static 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import static 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -44,22 +55,25 @@ public class FileWriterBucketStateSerializer
 
        private final SimpleVersionedSerializer<InProgressFileRecoverable> 
inProgressFileRecoverableSerializer;
 
+       private final SimpleVersionedSerializer<PendingFileRecoverable> 
pendingFileRecoverableSerializer;
+
        public FileWriterBucketStateSerializer(
-                       SimpleVersionedSerializer<InProgressFileRecoverable> 
inProgressFileRecoverableSerializer) {
-               this.inProgressFileRecoverableSerializer = checkNotNull(
-                               inProgressFileRecoverableSerializer);
+                       SimpleVersionedSerializer<InProgressFileRecoverable> 
inProgressFileRecoverableSerializer,
+                       SimpleVersionedSerializer<PendingFileRecoverable> 
pendingFileRecoverableSerializer) {
+               this.inProgressFileRecoverableSerializer = 
checkNotNull(inProgressFileRecoverableSerializer);
+               this.pendingFileRecoverableSerializer = 
checkNotNull(pendingFileRecoverableSerializer);
        }
 
        @Override
        public int getVersion() {
-               return 1;
+               return 3;
        }
 
        @Override
        public byte[] serialize(FileWriterBucketState state) throws IOException 
{
                DataOutputSerializer out = new DataOutputSerializer(256);
                out.writeInt(MAGIC_NUMBER);
-               serializeV1(state, out);
+               serializeV3(state, out);
                return out.getCopyOfBuffer();
        }
 
@@ -73,12 +87,18 @@ public class FileWriterBucketStateSerializer
                        case 1:
                                validateMagicNumber(in);
                                return deserializeV1(in);
+                       case 2:
+                               validateMagicNumber(in);
+                               return deserializeV2(in);
+                       case 3:
+                               validateMagicNumber(in);
+                               return deserializeV3(in);
                        default:
                                throw new IOException("Unrecognized version or 
corrupt state: " + version);
                }
        }
 
-       private void serializeV1(
+       private void serializeV3(
                        FileWriterBucketState state,
                        DataOutputView dataOutputView) throws IOException {
                SimpleVersionedSerialization.writeVersionAndSerialize(
@@ -102,7 +122,41 @@ public class FileWriterBucketStateSerializer
                }
        }
 
-       private FileWriterBucketState deserializeV1(DataInputView 
dataInputView) throws IOException {
+       private FileWriterBucketState deserializeV1(DataInputView in) throws 
IOException {
+               final 
SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> 
commitableSerializer = getCommitableSerializer();
+               final 
SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> 
resumableSerializer = getResumableSerializer();
+
+               return internalDeserialize(
+                               in,
+                               dataInputView -> new 
OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(
+                                               
SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, 
dataInputView)),
+                               (version, bytes) -> new 
OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(
+                                               
commitableSerializer.deserialize(version, bytes)));
+       }
+
+       private FileWriterBucketState deserializeV2(DataInputView in) throws 
IOException {
+               return internalDeserialize(
+                               in,
+                               dataInputView -> 
SimpleVersionedSerialization.readVersionAndDeSerialize(
+                                               
inProgressFileRecoverableSerializer,
+                                               dataInputView),
+                               pendingFileRecoverableSerializer::deserialize);
+       }
+
+       private FileWriterBucketState deserializeV3(DataInputView in) throws 
IOException {
+               return internalDeserialize(
+                               in,
+                               dataInputView -> 
SimpleVersionedSerialization.readVersionAndDeSerialize(
+                                               
inProgressFileRecoverableSerializer,
+                                               dataInputView),
+                               null);
+       }
+
+       private FileWriterBucketState internalDeserialize(
+                       DataInputView dataInputView,
+                       FunctionWithException<DataInputView, 
InProgressFileRecoverable, IOException> inProgressFileParser,
+                       @Nullable BiFunctionWithException<Integer, byte[], 
PendingFileRecoverable, IOException> pendingFileParser) throws IOException {
+
                String bucketId = 
SimpleVersionedSerialization.readVersionAndDeSerialize(
                                SimpleVersionedStringSerializer.INSTANCE,
                                dataInputView);
@@ -112,19 +166,37 @@ public class FileWriterBucketStateSerializer
                // then get the current resumable stream
                InProgressFileRecoverable current = null;
                if (dataInputView.readBoolean()) {
-                       current = 
SimpleVersionedSerialization.readVersionAndDeSerialize(
-                                       inProgressFileRecoverableSerializer,
-                                       dataInputView);
+                       current = inProgressFileParser.apply(dataInputView);
+               }
+
+               HashMap<Long, 
List<InProgressFileWriter.PendingFileRecoverable>> 
pendingFileRecoverablesPerCheckpoint = new HashMap<>();
+               if (pendingFileParser != null) {
+                       final int pendingFileRecoverableSerializerVersion = 
dataInputView.readInt();
+                       final int numCheckpoints = dataInputView.readInt();
+
+                       for (int i = 0; i < numCheckpoints; i++) {
+                               final long checkpointId = 
dataInputView.readLong();
+                               final int numOfPendingFileRecoverables = 
dataInputView.readInt();
+
+                               final 
List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new 
ArrayList<>(numOfPendingFileRecoverables);
+                               for (int j = 0; j < 
numOfPendingFileRecoverables; j++) {
+                                       final byte[] bytes = new 
byte[dataInputView.readInt()];
+                                       dataInputView.readFully(bytes);
+                                       
pendingFileRecoverables.add(pendingFileParser.apply(pendingFileRecoverableSerializerVersion,
 bytes));
+                               }
+                               
pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverables);
+                       }
                }
 
                return new FileWriterBucketState(
                                bucketId,
                                new Path(bucketPathStr),
                                creationTime,
-                               current);
+                               current,
+                               pendingFileRecoverablesPerCheckpoint);
        }
 
-       private static void validateMagicNumber(DataInputView in) throws 
IOException {
+       private void validateMagicNumber(DataInputView in) throws IOException {
                int magicNumber = in.readInt();
                if (magicNumber != MAGIC_NUMBER) {
                        throw new IOException(String.format(
@@ -132,4 +204,18 @@ public class FileWriterBucketStateSerializer
                                        magicNumber));
                }
        }
+
+       private SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> 
getResumableSerializer() {
+               final 
OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer
+                               
outputStreamBasedInProgressFileRecoverableSerializer =
+                               
(OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer)
 inProgressFileRecoverableSerializer;
+               return 
outputStreamBasedInProgressFileRecoverableSerializer.getResumeSerializer();
+       }
+
+       private SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> 
getCommitableSerializer() {
+               final 
OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer
+                               
outputStreamBasedPendingFileRecoverableSerializer =
+                               
(OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer)
 pendingFileRecoverableSerializer;
+               return 
outputStreamBasedPendingFileRecoverableSerializer.getCommitSerializer();
+       }
 }
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerTest.java
index 445d00b6..d173434 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerTest.java
@@ -70,7 +70,9 @@ public class FileWriterBucketStateSerializerTest {
        private FileWriterBucketState 
serializeAndDeserialize(FileWriterBucketState bucketState) throws IOException {
                FileWriterBucketStateSerializer serializer = new 
FileWriterBucketStateSerializer(
                                new 
FileSinkTestUtils.SimpleVersionedWrapperSerializer<>(
-                                               
FileSinkTestUtils.TestInProgressFileRecoverable::new));
+                                               
FileSinkTestUtils.TestInProgressFileRecoverable::new),
+                               new 
FileSinkTestUtils.SimpleVersionedWrapperSerializer<>(
+                                               
FileSinkTestUtils.TestPendingFileRecoverable::new));
                byte[] data = serializer.serialize(bucketState);
                return serializer.deserialize(serializer.getVersion(), data);
        }
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java
index 4d3f805..ec91dd7 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.fs.local.LocalRecoverableWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter;
 import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
@@ -45,7 +46,10 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertEquals;
@@ -254,6 +258,28 @@ public class FileWriterBucketTest {
                                bucketState.getInProgressFileRecoverable());
        }
 
+       /**
+        * Tests restoring with state containing pending files. This might 
happen
+        * if we are migrating from {@code StreamingFileSink}.
+        */
+       @Test
+       public void testRestoringWithOnlyPendingFiles() throws IOException {
+               final int noOfPendingFileCheckpoints = 4;
+
+               StubResumableWriter resumableWriter = new StubResumableWriter();
+               FileWriterBucket<String> bucket = 
getRestoredBucketWithOnlyPendingFiles(
+                               resumableWriter,
+                               DEFAULT_ROLLING_POLICY,
+                               noOfPendingFileCheckpoints);
+               assertNull("There should be no in-progress file", 
bucket.getInProgressPart());
+               // There is one pending file for each checkpoint
+               assertEquals(noOfPendingFileCheckpoints, 
bucket.getPendingFiles().size());
+
+               List<FileSinkCommittable> fileSinkCommittables = 
bucket.prepareCommit(false);
+               bucket.snapshotState();
+               compareNumberOfPendingAndInProgress(fileSinkCommittables, 
noOfPendingFileCheckpoints, 0);
+       }
+
        @Test
        public void testMergeWithInprogressFileNotSupportResume() throws 
IOException {
                FileWriterBucket<String> bucket1 = 
getRestoredBucketWithOnlyInProgressPart(
@@ -487,4 +513,37 @@ public class FileWriterBucketTest {
                                stateWithOnlyInProgressFile,
                                OutputFileConfig.builder().build());
        }
+
+       private FileWriterBucket<String> getRestoredBucketWithOnlyPendingFiles(
+                       BaseStubWriter writer,
+                       RollingPolicy<String, String> rollingPolicy,
+                       int numberOfPendingParts) throws IOException {
+
+               Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> 
completePartsPerCheckpoint =
+                               
createPendingPartsPerCheckpoint(numberOfPendingParts);
+
+               FileWriterBucketState state =
+                               new FileWriterBucketState(
+                                               "test",
+                                               new 
Path("file:///fake/fakefile"),
+                                               12345L,
+                                               null,
+                                               completePartsPerCheckpoint);
+
+               return FileWriterBucket.restore(
+                               new RowWiseBucketWriter<>(writer, ENCODER),
+                               rollingPolicy,
+                               state,
+                               OutputFileConfig.builder().build());
+       }
+
+       private Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> 
createPendingPartsPerCheckpoint(int noOfCheckpoints) {
+               final Map<Long, 
List<InProgressFileWriter.PendingFileRecoverable>> 
pendingCommittablesPerCheckpoint = new HashMap<>();
+               for (int checkpointId = 0; checkpointId < noOfCheckpoints; 
checkpointId++) {
+                       final List<InProgressFileWriter.PendingFileRecoverable> 
pending = new ArrayList<>();
+                       pending.add(new 
OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(new 
NoOpRecoverable()));
+                       pendingCommittablesPerCheckpoint.put((long) 
checkpointId, pending);
+               }
+               return pendingCommittablesPerCheckpoint;
+       }
 }
diff --git 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
index 64c2bae..715bf33 100644
--- 
a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
+++ 
b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
@@ -135,11 +135,14 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID> extends Abst
                        final long creationTime) throws IOException;
        }
 
-       static final class OutputStreamBasedPendingFileRecoverable implements 
PendingFileRecoverable {
+       /**
+        * The {@link PendingFileRecoverable} implementation for {@link 
OutputStreamBasedBucketWriter}.
+        */
+       public static final class OutputStreamBasedPendingFileRecoverable 
implements PendingFileRecoverable {
 
                private final RecoverableWriter.CommitRecoverable 
commitRecoverable;
 
-               OutputStreamBasedPendingFileRecoverable(final 
RecoverableWriter.CommitRecoverable commitRecoverable) {
+               public OutputStreamBasedPendingFileRecoverable(final 
RecoverableWriter.CommitRecoverable commitRecoverable) {
                        this.commitRecoverable = commitRecoverable;
                }
 
@@ -149,7 +152,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID> extends Abst
        }
 
        /**
-        * A kind of {@link InProgressFileRecoverable} implementation.
+        * The {@link InProgressFileRecoverable} implementation for {@link 
OutputStreamBasedBucketWriter}.
         */
        public static final class OutputStreamBasedInProgressFileRecoverable 
implements InProgressFileRecoverable {
 
@@ -183,7 +186,10 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID> extends Abst
                }
        }
 
-       static class OutputStreamBasedInProgressFileRecoverableSerializer 
implements SimpleVersionedSerializer<InProgressFileRecoverable> {
+       /**
+        * The serializer for {@link 
OutputStreamBasedInProgressFileRecoverable}.
+        */
+       public static class 
OutputStreamBasedInProgressFileRecoverableSerializer implements 
SimpleVersionedSerializer<InProgressFileRecoverable> {
 
                private static final int MAGIC_NUMBER = 0xb3a4073d;
 
@@ -219,7 +225,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID> extends Abst
                        }
                }
 
-               SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> 
getResumeSerializer() {
+               public 
SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> 
getResumeSerializer() {
                        return resumeSerializer;
                }
 
@@ -239,7 +245,10 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID> extends Abst
                }
        }
 
-       static class OutputStreamBasedPendingFileRecoverableSerializer 
implements SimpleVersionedSerializer<PendingFileRecoverable> {
+       /**
+        * The serializer for {@link OutputStreamBasedPendingFileRecoverable}.
+        */
+       public static class OutputStreamBasedPendingFileRecoverableSerializer 
implements SimpleVersionedSerializer<PendingFileRecoverable> {
 
                private static final int MAGIC_NUMBER = 0x2c853c89;
 
@@ -276,7 +285,7 @@ public abstract class OutputStreamBasedPartFileWriter<IN, 
BucketID> extends Abst
                        }
                }
 
-               SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> 
getCommitSerializer() {
+               public 
SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> 
getCommitSerializer() {
                        return this.commitSerializer;
                }
 

Reply via email to