This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 99a8891fe841c32f9ec07d67a124f8eacd954af2 Author: Yun Gao <[email protected]> AuthorDate: Fri Nov 27 18:11:00 2020 +0800 [hotfix][fs-connector] Remove unused state serializer from FileWriter --- .../org/apache/flink/connector/file/sink/writer/FileWriter.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java index 5c2faaa..aaf0473 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java @@ -85,10 +85,6 @@ public class FileWriter<IN> implements private final OutputFileConfig outputFileConfig; - // --------------------------- State Related Fields ----------------------------- - - private final FileWriterBucketStateSerializer bucketStateSerializer; - /** * A constructor creating a new empty bucket manager. * @@ -119,9 +115,6 @@ public class FileWriter<IN> implements this.activeBuckets = new HashMap<>(); this.bucketerContext = new BucketerContext(); - this.bucketStateSerializer = new FileWriterBucketStateSerializer( - bucketWriter.getProperties().getInProgressFileRecoverableSerializer()); - this.processingTimeService = checkNotNull(processingTimeService); checkArgument( bucketCheckInterval > 0, @@ -216,7 +209,7 @@ public class FileWriter<IN> implements @Override public List<FileWriterBucketState> snapshotState() throws IOException { checkState( - bucketWriter != null && bucketStateSerializer != null, + bucketWriter != null, "sink has not been initialized"); List<FileWriterBucketState> state = new ArrayList<>();
