This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5a9fe5d431cc7722a0af0ab2d6a95460e7e38e77
Author: Gao Yun <[email protected]>
AuthorDate: Wed May 27 10:46:46 2020 +0800

    [FLINK-17934][fs-connector] Add listener to Buckets and remove listener for 
BucketsBuilder
---
 .../flink/connectors/hive/HiveTableSink.java       |  7 +---
 .../HadoopPathBasedBulkFormatBuilder.java          | 13 ------
 .../sink/filesystem/BucketLifeCycleListener.java   |  4 +-
 .../api/functions/sink/filesystem/Buckets.java     | 12 +++---
 .../sink/filesystem/StreamingFileSink.java         | 18 ---------
 .../sink/filesystem/BucketAssignerITCases.java     |  1 -
 .../api/functions/sink/filesystem/BucketsTest.java | 17 ++++----
 .../sink/filesystem/RollingPolicyTest.java         |  1 -
 .../table/filesystem/FileSystemTableSink.java      | 12 ++----
 .../filesystem/stream/InactiveBucketListener.java  | 46 ----------------------
 .../filesystem/stream/StreamingFileWriter.java     | 28 ++++++++-----
 11 files changed, 39 insertions(+), 120 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index aa83d7a..35e355c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -46,7 +46,6 @@ import 
org.apache.flink.table.filesystem.FileSystemOutputFormat;
 import org.apache.flink.table.filesystem.FileSystemTableSink;
 import 
org.apache.flink.table.filesystem.FileSystemTableSink.TableBucketAssigner;
 import 
org.apache.flink.table.filesystem.FileSystemTableSink.TableRollingPolicy;
-import org.apache.flink.table.filesystem.stream.InactiveBucketListener;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.OverwritableTableSink;
 import org.apache.flink.table.sinks.PartitionableTableSink;
@@ -185,16 +184,14 @@ public class HiveTableSink implements 
AppendStreamTableSink, PartitionableTableS
                                                true,
                                                
conf.get(SINK_ROLLING_POLICY_FILE_SIZE),
                                                
conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL).toMillis());
-                               InactiveBucketListener listener = new 
InactiveBucketListener();
 
                                Optional<BulkWriter.Factory<RowData>> 
bulkFactory = createBulkWriterFactory(partitionColumns, sd);
-                               BucketsBuilder<RowData, ?, ? extends 
BucketsBuilder<RowData, ?, ?>> builder;
+                               BucketsBuilder<RowData, String, ? extends 
BucketsBuilder<RowData, ?, ?>> builder;
                                if (userMrWriter || !bulkFactory.isPresent()) {
                                        HiveBulkWriterFactory hadoopBulkFactory 
= new HiveBulkWriterFactory(recordWriterFactory);
                                        builder = new 
HadoopPathBasedBulkFormatBuilder<>(
                                                        new 
Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
                                                        
.withRollingPolicy(rollingPolicy)
-                                                       
.withBucketLifeCycleListener(listener)
                                                        
.withOutputFileConfig(outputFileConfig);
                                        LOG.info("Hive streaming sink: Use 
MapReduce RecordWriter writer.");
                                } else {
@@ -202,7 +199,6 @@ public class HiveTableSink implements 
AppendStreamTableSink, PartitionableTableS
                                                        new 
org.apache.flink.core.fs.Path(sd.getLocation()),
                                                        new 
FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
                                                        
.withBucketAssigner(assigner)
-                                                       
.withBucketLifeCycleListener(listener)
                                                        
.withRollingPolicy(rollingPolicy)
                                                        
.withOutputFileConfig(outputFileConfig);
                                        LOG.info("Hive streaming sink: Use 
native parquet&orc writer.");
@@ -215,7 +211,6 @@ public class HiveTableSink implements 
AppendStreamTableSink, PartitionableTableS
                                                overwrite,
                                                dataStream,
                                                builder,
-                                               listener,
                                                msFactory);
                        }
                } catch (TException e) {
diff --git 
a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
 
b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
index df51ff4..6ec8651 100644
--- 
a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
+++ 
b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.hadoop.bulk.DefaultHadoopFileCommitterFactory;
 import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory;
@@ -30,8 +29,6 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 
 /**
@@ -54,9 +51,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T 
extends HadoopPath
 
        private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
 
-       @Nullable
-       private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
-
        private BucketFactory<IN, BucketID> bucketFactory;
 
        private OutputFileConfig outputFileConfig;
@@ -108,12 +102,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, 
BucketID, T extends HadoopPath
                return self();
        }
 
-       @Internal
-       public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, 
BucketID> listener) {
-               this.bucketLifeCycleListener = 
Preconditions.checkNotNull(listener);
-               return self();
-       }
-
        public T withBucketFactory(BucketFactory<IN, BucketID> factory) {
                this.bucketFactory = Preconditions.checkNotNull(factory);
                return self();
@@ -140,7 +128,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, 
T extends HadoopPath
                                writerFactory,
                                fileCommitterFactory),
                        rollingPolicy,
-                       bucketLifeCycleListener,
                        subtaskIndex,
                        outputFileConfig);
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java
index f90f2a8..6667196 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java
@@ -20,13 +20,11 @@ package 
org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 
-import java.io.Serializable;
-
 /**
  * Listener about the status of {@link Bucket}.
  */
 @Internal
-public interface BucketLifeCycleListener<IN, BucketID> extends Serializable {
+public interface BucketLifeCycleListener<IN, BucketID> {
 
        /**
         * Notifies a new bucket has been created.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 0c9b73f..39acc29 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -63,9 +63,6 @@ public class Buckets<IN, BucketID> {
 
        private final RollingPolicy<IN, BucketID> rollingPolicy;
 
-       @Nullable
-       private final BucketLifeCycleListener<IN, BucketID> 
bucketLifeCycleListener;
-
        // --------------------------- runtime fields 
-----------------------------
 
        private final int subtaskIndex;
@@ -78,6 +75,9 @@ public class Buckets<IN, BucketID> {
 
        private final OutputFileConfig outputFileConfig;
 
+       @Nullable
+       private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
+
        // --------------------------- State Related Fields 
-----------------------------
 
        private final BucketStateSerializer<BucketID> bucketStateSerializer;
@@ -97,7 +97,6 @@ public class Buckets<IN, BucketID> {
                        final BucketFactory<IN, BucketID> bucketFactory,
                        final BucketWriter<IN, BucketID> bucketWriter,
                        final RollingPolicy<IN, BucketID> rollingPolicy,
-                       @Nullable final BucketLifeCycleListener<IN, BucketID> 
bucketLifeCycleListener,
                        final int subtaskIndex,
                        final OutputFileConfig outputFileConfig) {
 
@@ -106,7 +105,6 @@ public class Buckets<IN, BucketID> {
                this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
                this.bucketWriter = Preconditions.checkNotNull(bucketWriter);
                this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
-               this.bucketLifeCycleListener = bucketLifeCycleListener;
                this.subtaskIndex = subtaskIndex;
 
                this.outputFileConfig = 
Preconditions.checkNotNull(outputFileConfig);
@@ -121,6 +119,10 @@ public class Buckets<IN, BucketID> {
                this.maxPartCounter = 0L;
        }
 
+       public void setBucketLifeCycleListener(BucketLifeCycleListener<IN, 
BucketID> bucketLifeCycleListener) {
+               this.bucketLifeCycleListener = 
Preconditions.checkNotNull(bucketLifeCycleListener);
+       }
+
        /**
         * Initializes the state after recovery from a failure.
         *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 0962799..407420d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -189,8 +189,6 @@ public class StreamingFileSink<IN>
 
                private OutputFileConfig outputFileConfig;
 
-               private BucketLifeCycleListener<IN, BucketID> 
bucketLifeCycleListener;
-
                protected RowFormatBuilder(Path basePath, Encoder<IN> encoder, 
BucketAssigner<IN, BucketID> bucketAssigner) {
                        this(basePath, encoder, bucketAssigner, 
DefaultRollingPolicy.builder().build(), DEFAULT_BUCKET_CHECK_INTERVAL, new 
DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build());
                }
@@ -231,12 +229,6 @@ public class StreamingFileSink<IN>
                        return self();
                }
 
-               @Internal
-               public T withBucketLifeCycleListener(final 
BucketLifeCycleListener<IN, BucketID> listener) {
-                       this.bucketLifeCycleListener = 
Preconditions.checkNotNull(listener);
-                       return self();
-               }
-
                public T withOutputFileConfig(final OutputFileConfig 
outputFileConfig) {
                        this.outputFileConfig = outputFileConfig;
                        return self();
@@ -267,7 +259,6 @@ public class StreamingFileSink<IN>
                                        bucketFactory,
                                        new 
RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(),
 encoder),
                                        rollingPolicy,
-                                       bucketLifeCycleListener,
                                        subtaskIndex,
                                        outputFileConfig);
                }
@@ -303,8 +294,6 @@ public class StreamingFileSink<IN>
 
                private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
 
-               private BucketLifeCycleListener<IN, BucketID> 
bucketLifeCycleListener;
-
                private BucketFactory<IN, BucketID> bucketFactory;
 
                private OutputFileConfig outputFileConfig;
@@ -350,12 +339,6 @@ public class StreamingFileSink<IN>
                        return self();
                }
 
-               @Internal
-               public T withBucketLifeCycleListener(final 
BucketLifeCycleListener<IN, BucketID> listener) {
-                       this.bucketLifeCycleListener = 
Preconditions.checkNotNull(listener);
-                       return self();
-               }
-
                @VisibleForTesting
                T withBucketFactory(final BucketFactory<IN, BucketID> factory) {
                        this.bucketFactory = 
Preconditions.checkNotNull(factory);
@@ -387,7 +370,6 @@ public class StreamingFileSink<IN>
                                        bucketFactory,
                                        new 
BulkBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), 
writerFactory),
                                        rollingPolicy,
-                                       bucketLifeCycleListener,
                                        subtaskIndex,
                                        outputFileConfig);
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index ff2cc5a..e51043e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -57,7 +57,6 @@ public class BucketAssignerITCases {
                        new DefaultBucketFactoryImpl<>(),
                        new 
RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(),
 new SimpleStringEncoder<>()),
                        rollingPolicy,
-                       null,
                        0,
                        OutputFileConfig.builder().build()
                );
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index 8e2117a..9707ec7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -36,8 +36,6 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import javax.annotation.Nullable;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -324,7 +322,6 @@ public class BucketsTest {
                                new DefaultBucketFactoryImpl<>(),
                                new 
RowWiseBucketWriter<>(FileSystem.get(path.toUri()).createRecoverableWriter(), 
new SimpleStringEncoder<>()),
                                DefaultRollingPolicy.builder().build(),
-                               null,
                                2,
                                OutputFileConfig.builder().build()
                );
@@ -452,19 +449,23 @@ public class BucketsTest {
        private static Buckets<String, String> createBuckets(
                        final Path basePath,
                        final RollingPolicy<String, String> rollingPolicy,
-                       @Nullable final BucketLifeCycleListener<String, String> 
bucketLifeCycleListener,
+                       final BucketLifeCycleListener<String, String> 
bucketLifeCycleListener,
                        final int subtaskIdx,
                        final OutputFileConfig outputFileConfig) throws 
IOException {
-               return new Buckets<>(
+               Buckets<String, String> buckets = new Buckets<>(
                                basePath,
                                new TestUtils.StringIdentityBucketAssigner(),
                                new DefaultBucketFactoryImpl<>(),
                                new 
RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(),
 new SimpleStringEncoder<>()),
                                rollingPolicy,
-                               bucketLifeCycleListener,
                                subtaskIdx,
-                               outputFileConfig
-               );
+                               outputFileConfig);
+
+               if (bucketLifeCycleListener != null) {
+                       
buckets.setBucketLifeCycleListener(bucketLifeCycleListener);
+               }
+
+               return buckets;
        }
 
        private static Buckets<String, String> restoreBuckets(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index 2a4da34..1dbd30f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -204,7 +204,6 @@ public class RollingPolicyTest {
                                new DefaultBucketFactoryImpl<>(),
                                new 
RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(),
 new SimpleStringEncoder<>()),
                                rollingPolicyToTest,
-                               null,
                                0,
                                OutputFileConfig.builder().build()
                );
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 8ac6c0b..6408145 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -46,7 +46,6 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.FileSystemFormatFactory;
-import org.apache.flink.table.filesystem.stream.InactiveBucketListener;
 import org.apache.flink.table.filesystem.stream.StreamingFileCommitter;
 import 
org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
 import org.apache.flink.table.filesystem.stream.StreamingFileWriter;
@@ -150,21 +149,18 @@ public class FileSystemTableSink implements
                                        conf.get(SINK_ROLLING_POLICY_FILE_SIZE),
                                        
conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL).toMillis());
 
-                       BucketsBuilder<RowData, ?, ? extends 
BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
-                       InactiveBucketListener listener = new 
InactiveBucketListener();
+                       BucketsBuilder<RowData, String, ? extends 
BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
                        if (writer instanceof Encoder) {
                                //noinspection unchecked
                                bucketsBuilder = StreamingFileSink.forRowFormat(
                                                path, new 
ProjectionEncoder((Encoder<RowData>) writer, computer))
                                                .withBucketAssigner(assigner)
-                                               
.withBucketLifeCycleListener(listener)
                                                
.withRollingPolicy(rollingPolicy);
                        } else {
                                //noinspection unchecked
                                bucketsBuilder = 
StreamingFileSink.forBulkFormat(
                                                path, new 
ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
                                                .withBucketAssigner(assigner)
-                                               
.withBucketLifeCycleListener(listener)
                                                
.withRollingPolicy(rollingPolicy);
                        }
                        return createStreamingSink(
@@ -175,7 +171,6 @@ public class FileSystemTableSink implements
                                        overwrite,
                                        dataStream,
                                        bucketsBuilder,
-                                       listener,
                                        metaStoreFactory);
                }
        }
@@ -187,15 +182,14 @@ public class FileSystemTableSink implements
                        ObjectIdentifier tableIdentifier,
                        boolean overwrite,
                        DataStream<RowData> inputStream,
-                       BucketsBuilder<RowData, ?, ? extends 
BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
-                       InactiveBucketListener listener,
+                       BucketsBuilder<RowData, String, ? extends 
BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
                        TableMetaStoreFactory msFactory) {
                if (overwrite) {
                        throw new IllegalStateException("Streaming mode not 
support overwrite.");
                }
 
                StreamingFileWriter fileWriter = new StreamingFileWriter(
-                               BucketsBuilder.DEFAULT_BUCKET_CHECK_INTERVAL, 
bucketsBuilder, listener);
+                               BucketsBuilder.DEFAULT_BUCKET_CHECK_INTERVAL, 
bucketsBuilder);
                DataStream<CommitMessage> writerStream = inputStream.transform(
                                StreamingFileWriter.class.getSimpleName(),
                                
TypeExtractor.createTypeInfo(CommitMessage.class),
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java
deleted file mode 100644
index 69ab147..0000000
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.filesystem.stream;
-
-import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
-import 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener;
-import org.apache.flink.table.data.RowData;
-
-import java.util.function.Consumer;
-
-/**
- * Inactive {@link BucketLifeCycleListener} to obtain inactive buckets to 
consumer.
- */
-public class InactiveBucketListener implements 
BucketLifeCycleListener<RowData, String> {
-
-       private transient Consumer<String> inactiveConsumer;
-
-       public void setInactiveConsumer(Consumer<String> inactiveConsumer) {
-               this.inactiveConsumer = inactiveConsumer;
-       }
-
-       @Override
-       public void bucketCreated(Bucket<RowData, String> bucket) {
-       }
-
-       @Override
-       public void bucketInactive(Bucket<RowData, String> bucket) {
-               inactiveConsumer.accept(bucket.getBucketId());
-       }
-}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index 842f833..c2186bf 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.filesystem.stream;
 
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener;
 import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper;
@@ -51,14 +53,12 @@ public class StreamingFileWriter extends 
AbstractStreamOperator<CommitMessage>
 
        private final long bucketCheckInterval;
 
-       private final StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
-                       StreamingFileSink.BucketsBuilder<RowData, ?, ?>> 
bucketsBuilder;
-
-       private final InactiveBucketListener listener;
+       private final StreamingFileSink.BucketsBuilder<RowData, String, ? 
extends
+                       StreamingFileSink.BucketsBuilder<RowData, String, ?>> 
bucketsBuilder;
 
        // --------------------------- runtime fields 
-----------------------------
 
-       private transient Buckets<RowData, ?> buckets;
+       private transient Buckets<RowData, String> buckets;
 
        private transient StreamingFileSinkHelper<RowData> helper;
 
@@ -68,12 +68,10 @@ public class StreamingFileWriter extends 
AbstractStreamOperator<CommitMessage>
 
        public StreamingFileWriter(
                        long bucketCheckInterval,
-                       StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
-                                       
StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
-                       InactiveBucketListener listener) {
+                       StreamingFileSink.BucketsBuilder<RowData, String, ? 
extends
+                                       
StreamingFileSink.BucketsBuilder<RowData, String, ?>> bucketsBuilder) {
                this.bucketCheckInterval = bucketCheckInterval;
                this.bucketsBuilder = bucketsBuilder;
-               this.listener = listener;
                setChainingStrategy(ChainingStrategy.ALWAYS);
        }
 
@@ -90,7 +88,17 @@ public class StreamingFileWriter extends 
AbstractStreamOperator<CommitMessage>
 
                inactivePartitions = new HashSet<>();
                currentWatermark = Long.MIN_VALUE;
-               listener.setInactiveConsumer(b -> inactivePartitions.add(b));
+               buckets.setBucketLifeCycleListener(new 
BucketLifeCycleListener<RowData, String>() {
+
+                       @Override
+                       public void bucketCreated(Bucket<RowData, String> 
bucket) {
+                       }
+
+                       @Override
+                       public void bucketInactive(Bucket<RowData, String> 
bucket) {
+                               inactivePartitions.add(bucket.getBucketId());
+                       }
+               });
        }
 
        @Override

Reply via email to