This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5a9fe5d431cc7722a0af0ab2d6a95460e7e38e77 Author: Gao Yun <[email protected]> AuthorDate: Wed May 27 10:46:46 2020 +0800 [FLINK-17934][fs-connector] Add listener to Buckets and remove listener for BucketsBuilder --- .../flink/connectors/hive/HiveTableSink.java | 7 +--- .../HadoopPathBasedBulkFormatBuilder.java | 13 ------ .../sink/filesystem/BucketLifeCycleListener.java | 4 +- .../api/functions/sink/filesystem/Buckets.java | 12 +++--- .../sink/filesystem/StreamingFileSink.java | 18 --------- .../sink/filesystem/BucketAssignerITCases.java | 1 - .../api/functions/sink/filesystem/BucketsTest.java | 17 ++++---- .../sink/filesystem/RollingPolicyTest.java | 1 - .../table/filesystem/FileSystemTableSink.java | 12 ++---- .../filesystem/stream/InactiveBucketListener.java | 46 ---------------------- .../filesystem/stream/StreamingFileWriter.java | 28 ++++++++----- 11 files changed, 39 insertions(+), 120 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index aa83d7a..35e355c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -46,7 +46,6 @@ import org.apache.flink.table.filesystem.FileSystemOutputFormat; import org.apache.flink.table.filesystem.FileSystemTableSink; import org.apache.flink.table.filesystem.FileSystemTableSink.TableBucketAssigner; import org.apache.flink.table.filesystem.FileSystemTableSink.TableRollingPolicy; -import org.apache.flink.table.filesystem.stream.InactiveBucketListener; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.table.sinks.OverwritableTableSink; import org.apache.flink.table.sinks.PartitionableTableSink; @@ -185,16 +184,14 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS true, conf.get(SINK_ROLLING_POLICY_FILE_SIZE), conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL).toMillis()); - InactiveBucketListener listener = new InactiveBucketListener(); Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd); - BucketsBuilder<RowData, ?, ? extends BucketsBuilder<RowData, ?, ?>> builder; + BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder; if (userMrWriter || !bulkFactory.isPresent()) { HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory); builder = new HadoopPathBasedBulkFormatBuilder<>( new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner) .withRollingPolicy(rollingPolicy) - .withBucketLifeCycleListener(listener) .withOutputFileConfig(outputFileConfig); LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); } else { @@ -202,7 +199,6 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS new org.apache.flink.core.fs.Path(sd.getLocation()), new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer)) .withBucketAssigner(assigner) - .withBucketLifeCycleListener(listener) .withRollingPolicy(rollingPolicy) .withOutputFileConfig(outputFileConfig); LOG.info("Hive streaming sink: Use native parquet&orc writer."); @@ -215,7 +211,6 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS overwrite, dataStream, builder, - listener, msFactory); } } catch (TException e) { diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java index df51ff4..6ec8651 100644 --- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java +++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; -import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; import org.apache.flink.formats.hadoop.bulk.DefaultHadoopFileCommitterFactory; import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory; @@ -30,8 +29,6 @@ import org.apache.flink.util.Preconditions; import org.apache.hadoop.conf.Configuration; -import javax.annotation.Nullable; - import java.io.IOException; /** @@ -54,9 +51,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPath private CheckpointRollingPolicy<IN, BucketID> rollingPolicy; - @Nullable - private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener; - private BucketFactory<IN, BucketID> bucketFactory; private OutputFileConfig outputFileConfig; @@ -108,12 +102,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPath return self(); } - @Internal - public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) { - this.bucketLifeCycleListener = Preconditions.checkNotNull(listener); - return self(); - } - public T withBucketFactory(BucketFactory<IN, BucketID> factory) { this.bucketFactory = Preconditions.checkNotNull(factory); return self(); @@ -140,7 +128,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPath writerFactory, fileCommitterFactory), rollingPolicy, - bucketLifeCycleListener, subtaskIndex, outputFileConfig); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java index f90f2a8..6667196 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java @@ -20,13 +20,11 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; import org.apache.flink.annotation.Internal; -import java.io.Serializable; - /** * Listener about the status of {@link Bucket}. */ @Internal -public interface BucketLifeCycleListener<IN, BucketID> extends Serializable { +public interface BucketLifeCycleListener<IN, BucketID> { /** * Notifies a new bucket has been created. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java index 0c9b73f..39acc29 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java @@ -63,9 +63,6 @@ public class Buckets<IN, BucketID> { private final RollingPolicy<IN, BucketID> rollingPolicy; - @Nullable - private final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener; - // --------------------------- runtime fields ----------------------------- private final int subtaskIndex; @@ -78,6 +75,9 @@ public class Buckets<IN, BucketID> { private final OutputFileConfig outputFileConfig; + @Nullable + private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener; + // --------------------------- State Related Fields ----------------------------- private final BucketStateSerializer<BucketID> bucketStateSerializer; @@ -97,7 +97,6 @@ public class Buckets<IN, BucketID> { final BucketFactory<IN, BucketID> bucketFactory, final BucketWriter<IN, BucketID> bucketWriter, final RollingPolicy<IN, BucketID> rollingPolicy, - @Nullable final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener, final int subtaskIndex, final OutputFileConfig outputFileConfig) { @@ -106,7 +105,6 @@ public class Buckets<IN, BucketID> { this.bucketFactory = Preconditions.checkNotNull(bucketFactory); this.bucketWriter = Preconditions.checkNotNull(bucketWriter); this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy); - this.bucketLifeCycleListener = bucketLifeCycleListener; this.subtaskIndex = subtaskIndex; this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig); @@ -121,6 +119,10 @@ public class Buckets<IN, BucketID> { this.maxPartCounter = 0L; } + public void setBucketLifeCycleListener(BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener) { + this.bucketLifeCycleListener = Preconditions.checkNotNull(bucketLifeCycleListener); + } + /** * Initializes the state after recovery from a failure. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java index 0962799..407420d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java @@ -189,8 +189,6 @@ public class StreamingFileSink<IN> private OutputFileConfig outputFileConfig; - private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener; - protected RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) { this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build()); } @@ -231,12 +229,6 @@ public class StreamingFileSink<IN> return self(); } - @Internal - public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) { - this.bucketLifeCycleListener = Preconditions.checkNotNull(listener); - return self(); - } - public T withOutputFileConfig(final OutputFileConfig outputFileConfig) { this.outputFileConfig = outputFileConfig; return self(); @@ -267,7 +259,6 @@ public class StreamingFileSink<IN> bucketFactory, new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder), rollingPolicy, - bucketLifeCycleListener, subtaskIndex, outputFileConfig); } @@ -303,8 +294,6 @@ public class StreamingFileSink<IN> private CheckpointRollingPolicy<IN, BucketID> rollingPolicy; - private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener; - private BucketFactory<IN, BucketID> bucketFactory; private OutputFileConfig outputFileConfig; @@ -350,12 +339,6 @@ public class StreamingFileSink<IN> return self(); } - @Internal - public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) { - this.bucketLifeCycleListener = Preconditions.checkNotNull(listener); - return self(); - } - @VisibleForTesting T withBucketFactory(final BucketFactory<IN, BucketID> factory) { this.bucketFactory = Preconditions.checkNotNull(factory); @@ -387,7 +370,6 @@ public class StreamingFileSink<IN> bucketFactory, new BulkBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), writerFactory), rollingPolicy, - bucketLifeCycleListener, subtaskIndex, outputFileConfig); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java index ff2cc5a..e51043e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java @@ -57,7 +57,6 @@ public class BucketAssignerITCases { new DefaultBucketFactoryImpl<>(), new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()), rollingPolicy, - null, 0, OutputFileConfig.builder().build() ); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java index 8e2117a..9707ec7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java @@ -36,8 +36,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.annotation.Nullable; - import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -324,7 +322,6 @@ public class BucketsTest { new DefaultBucketFactoryImpl<>(), new RowWiseBucketWriter<>(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()), DefaultRollingPolicy.builder().build(), - null, 2, OutputFileConfig.builder().build() ); @@ -452,19 +449,23 @@ public class BucketsTest { private static Buckets<String, String> createBuckets( final Path basePath, final RollingPolicy<String, String> rollingPolicy, - @Nullable final BucketLifeCycleListener<String, String> bucketLifeCycleListener, + final BucketLifeCycleListener<String, String> bucketLifeCycleListener, final int subtaskIdx, final OutputFileConfig outputFileConfig) throws IOException { - return new Buckets<>( + Buckets<String, String> buckets = new Buckets<>( basePath, new TestUtils.StringIdentityBucketAssigner(), new DefaultBucketFactoryImpl<>(), new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()), rollingPolicy, - bucketLifeCycleListener, subtaskIdx, - outputFileConfig - ); + outputFileConfig); + + if (bucketLifeCycleListener != null) { + buckets.setBucketLifeCycleListener(bucketLifeCycleListener); + } + + return buckets; } private static Buckets<String, String> restoreBuckets( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java index 2a4da34..1dbd30f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java @@ -204,7 +204,6 @@ public class RollingPolicyTest { new DefaultBucketFactoryImpl<>(), new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()), rollingPolicyToTest, - null, 0, OutputFileConfig.builder().build() ); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java index 8ac6c0b..6408145 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java @@ -46,7 +46,6 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.FileSystemFormatFactory; -import org.apache.flink.table.filesystem.stream.InactiveBucketListener; import org.apache.flink.table.filesystem.stream.StreamingFileCommitter; import org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage; import org.apache.flink.table.filesystem.stream.StreamingFileWriter; @@ -150,21 +149,18 @@ public class FileSystemTableSink implements conf.get(SINK_ROLLING_POLICY_FILE_SIZE), conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL).toMillis()); - BucketsBuilder<RowData, ?, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder; - InactiveBucketListener listener = new InactiveBucketListener(); + BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder; if (writer instanceof Encoder) { //noinspection unchecked bucketsBuilder = StreamingFileSink.forRowFormat( path, new ProjectionEncoder((Encoder<RowData>) writer, computer)) .withBucketAssigner(assigner) - .withBucketLifeCycleListener(listener) .withRollingPolicy(rollingPolicy); } else { //noinspection unchecked bucketsBuilder = StreamingFileSink.forBulkFormat( path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer)) .withBucketAssigner(assigner) - .withBucketLifeCycleListener(listener) .withRollingPolicy(rollingPolicy); } return createStreamingSink( @@ -175,7 +171,6 @@ public class FileSystemTableSink implements overwrite, dataStream, bucketsBuilder, - listener, metaStoreFactory); } } @@ -187,15 +182,14 @@ public class FileSystemTableSink implements ObjectIdentifier tableIdentifier, boolean overwrite, DataStream<RowData> inputStream, - BucketsBuilder<RowData, ?, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder, - InactiveBucketListener listener, + BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder, TableMetaStoreFactory msFactory) { if (overwrite) { throw new IllegalStateException("Streaming mode not support overwrite."); } StreamingFileWriter fileWriter = new StreamingFileWriter( - BucketsBuilder.DEFAULT_BUCKET_CHECK_INTERVAL, bucketsBuilder, listener); + BucketsBuilder.DEFAULT_BUCKET_CHECK_INTERVAL, bucketsBuilder); DataStream<CommitMessage> writerStream = inputStream.transform( StreamingFileWriter.class.getSimpleName(), TypeExtractor.createTypeInfo(CommitMessage.class), diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java deleted file mode 100644 index 69ab147..0000000 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.filesystem.stream; - -import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket; -import org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener; -import org.apache.flink.table.data.RowData; - -import java.util.function.Consumer; - -/** - * Inactive {@link BucketLifeCycleListener} to obtain inactive buckets to consumer. - */ -public class InactiveBucketListener implements BucketLifeCycleListener<RowData, String> { - - private transient Consumer<String> inactiveConsumer; - - public void setInactiveConsumer(Consumer<String> inactiveConsumer) { - this.inactiveConsumer = inactiveConsumer; - } - - @Override - public void bucketCreated(Bucket<RowData, String> bucket) { - } - - @Override - public void bucketInactive(Bucket<RowData, String> bucket) { - inactiveConsumer.accept(bucket.getBucketId()); - } -} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java index 842f833..c2186bf 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java @@ -20,6 +20,8 @@ package org.apache.flink.table.filesystem.stream; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket; +import org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener; import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper; @@ -51,14 +53,12 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage> private final long bucketCheckInterval; - private final StreamingFileSink.BucketsBuilder<RowData, ?, ? extends - StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder; - - private final InactiveBucketListener listener; + private final StreamingFileSink.BucketsBuilder<RowData, String, ? extends + StreamingFileSink.BucketsBuilder<RowData, String, ?>> bucketsBuilder; // --------------------------- runtime fields ----------------------------- - private transient Buckets<RowData, ?> buckets; + private transient Buckets<RowData, String> buckets; private transient StreamingFileSinkHelper<RowData> helper; @@ -68,12 +68,10 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage> public StreamingFileWriter( long bucketCheckInterval, - StreamingFileSink.BucketsBuilder<RowData, ?, ? extends - StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder, - InactiveBucketListener listener) { + StreamingFileSink.BucketsBuilder<RowData, String, ? extends + StreamingFileSink.BucketsBuilder<RowData, String, ?>> bucketsBuilder) { this.bucketCheckInterval = bucketCheckInterval; this.bucketsBuilder = bucketsBuilder; - this.listener = listener; setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -90,7 +88,17 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage> inactivePartitions = new HashSet<>(); currentWatermark = Long.MIN_VALUE; - listener.setInactiveConsumer(b -> inactivePartitions.add(b)); + buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() { + + @Override + public void bucketCreated(Bucket<RowData, String> bucket) { + } + + @Override + public void bucketInactive(Bucket<RowData, String> bucket) { + inactivePartitions.add(bucket.getBucketId()); + } + }); } @Override
