This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c56def03b1debc1c26759d57c017ae19b3a05eea Author: Arvid Heise <[email protected]> AuthorDate: Thu Sep 5 13:21:40 2024 +0200 [FLINK-25920] Straighten EOI handling in CommittableCollector In some parts of the sink, EOI is treated as checkpointId=null and in some checkpointId=MAX. The code of CheckpointCommittableManagerImpl implies that a null is valid however the serializer actually breaks then. In practice, checkpointId=MAX is used all the time by accident. This commit replaces the nullable checkpointIds with a primitive long EOI=MAX, so that we always use the special value instead of null. The serializer already used that value, so it actually simplifies many places and doesn't break any existing state. --- .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e | 148 ++++++++++----------- .../sink/compactor/operator/CompactorOperator.java | 10 +- .../operator/CompactorOperatorStateHandler.java | 10 +- .../api/connector/sink2/CommittableMessage.java | 22 ++- .../sink2/CommittableMessageSerializer.java | 19 +-- .../sink2/CommittableMessageTypeInfo.java | 2 +- .../api/connector/sink2/CommittableSummary.java | 58 +++++++- .../connector/sink2/CommittableWithLineage.java | 43 +++++- .../runtime/operators/sink/CommitterOperator.java | 7 +- .../runtime/operators/sink/SinkWriterOperator.java | 4 +- .../CheckpointCommittableManagerImpl.java | 28 ++-- .../sink/committables/CommittableCollector.java | 11 +- .../committables/SubtaskCommittableManager.java | 11 +- .../sink2/CommittableMessageSerializerTest.java | 4 +- .../connector/sink2/CommittableSummaryAssert.java | 10 +- .../sink2/CommittableWithLinageAssert.java | 13 +- .../sink2/GlobalCommitterOperatorTest.java | 9 +- .../committables/CommittableCollectorTest.java | 6 +- .../operators/sink/CommitterOperatorTestBase.java | 15 +-- .../operators/sink/SinkWriterOperatorTestBase.java | 11 +- 20 files changed, 245 insertions(+), 196 deletions(-) diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e index 107dc673280..91d8cab4e67 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e +++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e @@ -54,9 +54,9 @@ Constructor <org.apache.flink.connector.datagen.table.types.DataGeneratorMapper. Constructor <org.apache.flink.connector.datagen.table.types.RowDataGenerator.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.util.List, float)> depends on component type <org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in (RowDataGenerator.java:0) Constructor <org.apache.flink.connector.datagen.table.types.RowDataGenerator.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;, java.util.List, float)> has parameter of type <[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in (RowDataGenerator.java:0) Constructor <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.<init>(org.apache.flink.core.fs.Path, long, org.apache.flink.api.common.serialization.BulkWriter$Factory, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner, org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy, org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileCo [...] -Constructor <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.<init>(org.apache.flink.core.fs.Path, org.apache.flink.api.common.serialization.BulkWriter$Factory, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSink.java:533) +Constructor <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.<init>(org.apache.flink.core.fs.Path, org.apache.flink.api.common.serialization.BulkWriter$Factory, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSink.java:548) Constructor <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.<init>(org.apache.flink.core.fs.Path, long, org.apache.flink.api.common.serialization.Encoder, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy, org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has parameter of type <org.apac [...] -Constructor <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.<init>(org.apache.flink.core.fs.Path, org.apache.flink.api.common.serialization.Encoder, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSink.java:355) +Constructor <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.<init>(org.apache.flink.core.fs.Path, org.apache.flink.api.common.serialization.Encoder, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSink.java:370) Constructor <org.apache.flink.connector.file.sink.FileSinkCommittable.<init>(java.lang.String, org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable> in (FileSinkCommittable.java:0) Constructor <org.apache.flink.connector.file.sink.FileSinkCommittable.<init>(java.lang.String, org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable> in (FileSinkCommittable.java:0) Constructor <org.apache.flink.connector.file.sink.FileSinkCommittable.<init>(java.lang.String, org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable, org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable, org.apache.flink.core.fs.Path)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable> in (FileSinkCommittable.java:0) @@ -100,7 +100,7 @@ Constructor <org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAss Constructor <org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner.<init>(java.util.Collection)> calls constructor <org.apache.flink.metrics.SimpleCounter.<init>()> in (LocalityAwareSplitAssigner.java:81) Constructor <org.apache.flink.connector.file.src.impl.ContinuousFileSplitEnumerator.<init>(org.apache.flink.api.connector.source.SplitEnumeratorContext, org.apache.flink.connector.file.src.enumerate.FileEnumerator, org.apache.flink.connector.file.src.assigners.FileSplitAssigner, [Lorg.apache.flink.core.fs.Path;, java.util.Collection, long)> has parameter of type <[Lorg.apache.flink.core.fs.Path;> in (ContinuousFileSplitEnumerator.java:0) Constructor <org.apache.flink.connector.file.table.ColumnarRowIterator.<init>(org.apache.flink.table.data.columnar.ColumnarRowData, java.lang.Runnable)> has parameter of type <org.apache.flink.table.data.columnar.ColumnarRowData> in (ColumnarRowIterator.java:0) -Constructor <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.<init>()> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.<init>(java.lang.String, java.lang.String)> in (FileSystemOutputFormat.java:216) +Constructor <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.<init>()> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.<init>(java.lang.String, java.lang.String)> in (FileSystemOutputFormat.java:235) Constructor <org.apache.flink.connector.file.table.FileSystemOutputFormat.<init>(org.apache.flink.connector.file.table.FileSystemFactory, org.apache.flink.connector.file.table.TableMetaStoreFactory, boolean, boolean, org.apache.flink.core.fs.Path, [Ljava.lang.String;, boolean, java.util.LinkedHashMap, org.apache.flink.connector.file.table.OutputFormatFactory, org.apache.flink.connector.file.table.PartitionComputer, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig [...] Constructor <org.apache.flink.connector.file.table.FileSystemOutputFormat.<init>(org.apache.flink.connector.file.table.FileSystemFactory, org.apache.flink.connector.file.table.TableMetaStoreFactory, boolean, boolean, org.apache.flink.core.fs.Path, [Ljava.lang.String;, boolean, java.util.LinkedHashMap, org.apache.flink.connector.file.table.OutputFormatFactory, org.apache.flink.connector.file.table.PartitionComputer, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig [...] Constructor <org.apache.flink.connector.file.table.PartitionTempFileManager.<init>(org.apache.flink.connector.file.table.FileSystemFactory, org.apache.flink.core.fs.Path, int, int)> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.<init>(java.lang.String, java.lang.String)> in (PartitionTempFileManager.java:71) @@ -174,8 +174,8 @@ Field <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter. Field <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.writer> has type <org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter> in (CompactBucketWriter.java:0) Field <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.writer> has type <org.apache.flink.api.common.functions.util.PrintSinkOutputWriter> in (PrintTableSinkFactory.java:0) Method <org.apache.flink.connector.base.source.hybrid.HybridSource$HybridSourceBuilder.addSource(org.apache.flink.connector.base.source.hybrid.HybridSource$SourceFactory, org.apache.flink.api.connector.source.Boundedness)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (HybridSource.java:246) -Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> calls method <org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits.signalIntermediateNoMoreSplits(int)> in (HybridSourceSplitEnumerator.java:448) -Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> checks instanceof <org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits> in (HybridSourceSplitEnumerator.java:440) +Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> calls method <org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits.signalIntermediateNoMoreSplits(int)> in (HybridSourceSplitEnumerator.java:450) +Method <org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext, int, int, int)> checks instanceof <org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits> in (HybridSourceSplitEnumerator.java:442) Method <org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.getNumAliveFetchers()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (SplitFetcherManager.java:0) Method <org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.take()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FutureCompletingBlockingQueue.java:0) Method <org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction.getSerializer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FromElementsGeneratorFunction.java:0) @@ -183,7 +183,7 @@ Method <org.apache.flink.connector.datagen.functions.IndexLookupGeneratorFunctio Method <org.apache.flink.connector.datagen.source.DataGeneratorSource.getGeneratorFunction()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DataGeneratorSource.java:0) Method <org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.createReader(org.apache.flink.api.connector.source.SourceReaderContext)> calls constructor <org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader.<init>(org.apache.flink.api.connector.source.SourceReader, org.apache.flink.api.connector.source.util.ratelimit.RateLimiter)> in (GeneratorSourceReaderFactory.java:63) Method <org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.createReader(org.apache.flink.api.connector.source.SourceReaderContext)> calls method <org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy.createRateLimiter(int)> in (GeneratorSourceReaderFactory.java:62) -Method <org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> calls constructor <org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.<init>(org.apache.flink.streaming.api.functions.source.datagen.DataGenerator, long, java.lang.Long)> in (DataGenTableSource.java:67) +Method <org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> calls constructor <org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.<init>(org.apache.flink.streaming.api.functions.source.datagen.DataGenerator, long, java.lang.Long)> in (DataGenTableSource.java:72) Method <org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> has return type <org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource> in (DataGenTableSource.java:0) Method <org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (DataGenTableSource.java:0) Method <org.apache.flink.connector.datagen.table.DataGeneratorContainer.getGenerator()> has return type <org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in (DataGeneratorContainer.java:0) @@ -217,10 +217,10 @@ Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(or Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.TinyIntType)> calls method <org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)> in (RandomGeneratorVisitor.java:192) Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.VarBinaryType)> calls method <org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withVarLen(boolean)> in (RandomGeneratorVisitor.java:179) Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.VarCharType)> calls method <org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)> in (RandomGeneratorVisitor.java:158) -Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.VarCharType)> calls method <org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withVarLen(boolean)> in (RandomGeneratorVisitor.java:172) -Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.YearMonthIntervalType)> calls method <org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.intGenerator(int, int)> in (RandomGeneratorVisitor.java:292) -Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.YearMonthIntervalType)> calls method <org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)> in (RandomGeneratorVisitor.java:293) -Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.ZonedTimestampType)> calls method <org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)> in (RandomGeneratorVisitor.java:331) +Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.VarCharType)> calls method <org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withVarLen(boolean)> in (RandomGeneratorVisitor.java:159) +Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.YearMonthIntervalType)> calls method <org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.intGenerator(int, int)> in (RandomGeneratorVisitor.java:287) +Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.YearMonthIntervalType)> calls method <org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)> in (RandomGeneratorVisitor.java:288) +Method <org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.ZonedTimestampType)> calls method <org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)> in (RandomGeneratorVisitor.java:326) Method <org.apache.flink.connector.datagen.table.SequenceGeneratorVisitor$2.next()> calls method <org.apache.flink.shaded.guava32.com.google.common.primitives.Longs.toByteArray(long)> in (SequenceGeneratorVisitor.java:221) Method <org.apache.flink.connector.datagen.table.SequenceGeneratorVisitor.getSequenceBytesGenerator(long, long)> has return type <org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator> in (SequenceGeneratorVisitor.java:0) Method <org.apache.flink.connector.datagen.table.SequenceGeneratorVisitor.getSequenceStringGenerator(long, long)> has return type <org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator> in (SequenceGeneratorVisitor.java:0) @@ -240,37 +240,37 @@ Method <org.apache.flink.connector.datagen.table.types.RowDataGenerator.next()> Method <org.apache.flink.connector.datagen.table.types.RowDataGenerator.open(java.lang.String, org.apache.flink.runtime.state.FunctionInitializationContext, org.apache.flink.api.common.functions.RuntimeContext)> calls method <org.apache.flink.streaming.api.functions.source.datagen.DataGenerator.open(java.lang.String, org.apache.flink.runtime.state.FunctionInitializationContext, org.apache.flink.api.common.functions.RuntimeContext)> in (RowDataGenerator.java:54) Method <org.apache.flink.connector.datagen.table.types.RowDataGenerator.snapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext)> calls method <org.apache.flink.streaming.api.functions.source.datagen.DataGenerator.snapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext)> in (RowDataGenerator.java:61) Method <org.apache.flink.connector.file.sink.FileSink$BucketsBuilder.createBucketWriter()> has return type <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in (FileSink.java:0) -Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter()> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.<init>(org.apache.flink.core.fs.RecoverableWriter, org.apache.flink.api.common.serialization.BulkWriter$Factory)> in (FileSink.java:674) +Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter()> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.<init>(org.apache.flink.core.fs.RecoverableWriter, org.apache.flink.api.common.serialization.BulkWriter$Factory)> in (FileSink.java:698) Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter()> has return type <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in (FileSink.java:0) Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.Sink$InitContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSink.java:638) Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.Sink$InitContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.getPartPrefix()> in (FileSink.java:639) Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.Sink$InitContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.getPartSuffix()> in (FileSink.java:640) -Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:668) -Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:669) -Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()> in (FileSink.java:669) -Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()> in (FileSink.java:668) -Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:658) -Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:659) -Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()> in (FileSink.java:658) -Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()> in (FileSink.java:659) +Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:692) +Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:693) +Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()> in (FileSink.java:693) +Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()> in (FileSink.java:692) +Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:682) +Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:683) +Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()> in (FileSink.java:682) +Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()> in (FileSink.java:683) Method <org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.withOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in (FileSink.java:0) -Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter()> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.<init>(org.apache.flink.core.fs.RecoverableWriter, org.apache.flink.api.common.serialization.Encoder)> in (FileSink.java:480) +Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter()> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.<init>(org.apache.flink.core.fs.RecoverableWriter, org.apache.flink.api.common.serialization.Encoder)> in (FileSink.java:495) Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter()> has return type <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in (FileSink.java:0) Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.WriterInitContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSink.java:435) Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.WriterInitContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.getPartPrefix()> in (FileSink.java:436) Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.WriterInitContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.getPartSuffix()> in (FileSink.java:437) -Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:474) -Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:475) -Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()> in (FileSink.java:475) -Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()> in (FileSink.java:474) -Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:464) -Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:465) -Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()> in (FileSink.java:464) -Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()> in (FileSink.java:465) +Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:489) +Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:490) +Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()> in (FileSink.java:490) +Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()> in (FileSink.java:489) +Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:479) +Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()> in (FileSink.java:480) +Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()> in (FileSink.java:479) +Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()> in (FileSink.java:480) Method <org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.withOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in (FileSink.java:0) -Method <org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, boolean)> in (FileSink.java:231) -Method <org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, boolean)> in (FileSink.java:243) -Method <org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, boolean)> in (FileSink.java:275) +Method <org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, boolean)> in (FileSink.java:246) +Method <org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, boolean)> in (FileSink.java:258) +Method <org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, boolean)> in (FileSink.java:290) Method <org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic parameter type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.file.sink.FileSinkCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (FileSink.java:0) Method <org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic return type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.file.sink.FileSinkCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (FileSink.java:0) Method <org.apache.flink.connector.file.sink.FileSinkCommittable.getInProgressFileToCleanup()> has return type <org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable> in (FileSinkCommittable.java:0) @@ -330,26 +330,26 @@ Method <org.apache.flink.connector.file.sink.compactor.operator.CompactService.g Method <org.apache.flink.connector.file.sink.compactor.operator.CompactService.getWriterType(org.apache.flink.connector.file.sink.compactor.FileCompactor)> gets field <org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter$Type.RECORD_WISE> in (CompactService.java:169) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactService.getWriterType(org.apache.flink.connector.file.sink.compactor.FileCompactor)> has return type <org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter$Type> in (CompactService.java:0) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactService.open()> calls method <org.apache.flink.runtime.util.Hardware.getNumberCPUCores()> in (CompactService.java:70) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserialize(int, [B)> calls constructor <org.apache.flink.core.memory.DataInputDeserializer.<init>([B)> in (CompactorOperator.java:301) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserializeV1(org.apache.flink.core.memory.DataInputDeserializer)> calls method <org.apache.flink.core.memory.DataInputDeserializer.readInt()> in (CompactorOperator.java:325) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserializeV1(org.apache.flink.core.memory.DataInputDeserializer)> calls method <org.apache.flink.core.memory.DataInputDeserializer.readLong()> in (CompactorOperator.java:328) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserialize(int, [B)> calls constructor <org.apache.flink.core.memory.DataInputDeserializer.<init>([B)> in (CompactorOperator.java:299) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserializeV1(org.apache.flink.core.memory.DataInputDeserializer)> calls method <org.apache.flink.core.memory.DataInputDeserializer.readInt()> in (CompactorOperator.java:323) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserializeV1(org.apache.flink.core.memory.DataInputDeserializer)> calls method <org.apache.flink.core.memory.DataInputDeserializer.readLong()> in (CompactorOperator.java:326) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserializeV1(org.apache.flink.core.memory.DataInputDeserializer)> has parameter of type <org.apache.flink.core.memory.DataInputDeserializer> in (CompactorOperator.java:0) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)> calls constructor <org.apache.flink.core.memory.DataOutputSerializer.<init>(int)> in (CompactorOperator.java:292) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)> calls method <org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer()> in (CompactorOperator.java:295) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)> calls method <org.apache.flink.core.memory.DataOutputSerializer.writeInt(int)> in (CompactorOperator.java:293) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serializeV1(java.util.Map, org.apache.flink.core.memory.DataOutputSerializer)> calls method <org.apache.flink.core.memory.DataOutputSerializer.writeInt(int)> in (CompactorOperator.java:315) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serializeV1(java.util.Map, org.apache.flink.core.memory.DataOutputSerializer)> calls method <org.apache.flink.core.memory.DataOutputSerializer.writeLong(long)> in (CompactorOperator.java:317) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)> calls constructor <org.apache.flink.core.memory.DataOutputSerializer.<init>(int)> in (CompactorOperator.java:290) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)> calls method <org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer()> in (CompactorOperator.java:293) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)> calls method <org.apache.flink.core.memory.DataOutputSerializer.writeInt(int)> in (CompactorOperator.java:291) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serializeV1(java.util.Map, org.apache.flink.core.memory.DataOutputSerializer)> calls method <org.apache.flink.core.memory.DataOutputSerializer.writeInt(int)> in (CompactorOperator.java:313) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serializeV1(java.util.Map, org.apache.flink.core.memory.DataOutputSerializer)> calls method <org.apache.flink.core.memory.DataOutputSerializer.writeLong(long)> in (CompactorOperator.java:315) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serializeV1(java.util.Map, org.apache.flink.core.memory.DataOutputSerializer)> has parameter of type <org.apache.flink.core.memory.DataOutputSerializer> in (CompactorOperator.java:0) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)> calls constructor <org.apache.flink.streaming.api.connector.sink2.CommittableSummary.<init>(int, int, java.lang.Long, int, int, int)> in (CompactorOperator.java:254) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)> calls constructor <org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object, java.lang.Long, int)> in (CompactorOperator.java:262) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactorOperator.java:256) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactorOperator.java:263) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactorOperator.java:250) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactorOperator.java:251) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactorOperator.java:262) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)> calls constructor <org.apache.flink.streaming.api.connector.sink2.CommittableSummary.<init>(int, int, long, int, int, int)> in (CompactorOperator.java:252) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)> calls constructor <org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object, long, int)> in (CompactorOperator.java:260) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactorOperator.java:254) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactorOperator.java:261) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactorOperator.java:248) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactorOperator.java:249) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactorOperator.java:260) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.getAllTasksFuture()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (CompactorOperator.java:0) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.streaming.api.operators.util.SimpleVersionedListState.<init>(org.apache.flink.api.common.state.ListState, org.apache.flink.core.io.SimpleVersionedSerializer)> in (CompactorOperator.java:187) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactorOperator.java:135) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.streaming.api.operators.util.SimpleVersionedListState.<init>(org.apache.flink.api.common.state.ListState, org.apache.flink.core.io.SimpleVersionedSerializer)> in (CompactorOperator.java:185) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactorOperator.java:133) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (CompactorOperator.java:0) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (CompactorOperator.java:0) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorFactory.createStreamOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters)> calls method <org.apache.flink.streaming.api.graph.StreamConfig.getOperatorName()> in (CompactorOperatorFactory.java:84) @@ -361,10 +361,12 @@ Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperato Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorFactory.createStreamOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters)> has generic parameter type <org.apache.flink.streaming.api.operators.StreamOperatorParameters<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.file.sink.FileSinkCommittable>>> with type argument depending on <org.apache.flink.streaming.api.operators.StreamOperatorParame [...] Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorFactory.createStreamOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters)> has parameter of type <org.apache.flink.streaming.api.operators.StreamOperatorParameters> in (CompactorOperatorFactory.java:0) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorFactory.createStreamOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters)> has type parameter 'T' depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (CompactorOperatorFactory.java:0) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls constructor <org.apache.flink.streaming.api.connector.sink2.CommittableSummary.<init>(int, int, java.lang.Long, int, int, int)> in (CompactorOperatorStateHandler.java:177) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls constructor <org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object, java.lang.Long, int)> in (CompactorOperatorStateHandler.java:184) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls constructor <org.apache.flink.streaming.api.connector.sink2.CommittableSummary.<init>(int, int, long, int, int, int)> in (CompactorOperatorStateHandler.java:177) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls constructor <org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object, long, int)> in (CompactorOperatorStateHandler.java:184) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactorOperatorStateHandler.java:177) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactorOperatorStateHandler.java:184) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls method <org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getCheckpointIdOrEOI()> in (CompactorOperatorStateHandler.java:174) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls method <org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getCheckpointIdOrEOI()> in (CompactorOperatorStateHandler.java:183) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls method <org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getNumberOfCommittables()> in (CompactorOperatorStateHandler.java:175) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls method <org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getNumberOfFailedCommittables()> in (CompactorOperatorStateHandler.java:177) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls method <org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getNumberOfPendingCommittables()> in (CompactorOperatorStateHandler.java:176) @@ -373,11 +375,9 @@ Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperato Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> calls method <org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getSubtaskId()> in (CompactorOperatorStateHandler.java:184) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> has generic parameter type <org.apache.flink.streaming.api.connector.sink2.CommittableSummary<org.apache.flink.connector.file.sink.FileSinkCommittable>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableSummary> in (CompactorOperatorStateHandler.java:0) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)> has parameter of type <org.apache.flink.streaming.api.connector.sink2.CommittableSummary> in (CompactorOperatorStateHandler.java:0) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.getCheckpointId(org.apache.flink.streaming.api.connector.sink2.CommittableMessage)> calls method <org.apache.flink.streaming.api.connector.sink2.CommittableMessage.getCheckpointId()> in (CompactorOperatorStateHandler.java:279) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.getCheckpointId(org.apache.flink.streaming.api.connector.sink2.CommittableMessage)> has generic parameter type <org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.file.sink.FileSinkCommittable>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (CompactorOperatorStateHandler.java:0) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.getCheckpointId(org.apache.flink.streaming.api.connector.sink2.CommittableMessage)> has parameter of type <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (CompactorOperatorStateHandler.java:0) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)> calls constructor <org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object, java.lang.Long, int)> in (CompactorOperatorStateHandler.java:218) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)> calls constructor <org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object, long, int)> in (CompactorOperatorStateHandler.java:218) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactorOperatorStateHandler.java:218) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)> calls method <org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.getCheckpointIdOrEOI()> in (CompactorOperatorStateHandler.java:207) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)> calls method <org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.getCommittable()> in (CompactorOperatorStateHandler.java:196) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)> calls method <org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.getSubtaskId()> in (CompactorOperatorStateHandler.java:218) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)> has generic parameter type <org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage<org.apache.flink.connector.file.sink.FileSinkCommittable>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage> in (CompactorOperatorStateHandler.java:0) @@ -417,7 +417,7 @@ Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestSerializer.serialize(org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest)> calls method <org.apache.flink.core.memory.DataOutputSerializer.writeInt(int)> in (CompactorRequestSerializer.java:53) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestSerializer.serializeV1(org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest, org.apache.flink.core.memory.DataOutputSerializer)> calls method <org.apache.flink.core.memory.DataOutputSerializer.writeUTF(java.lang.String)> in (CompactorRequestSerializer.java:73) Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestSerializer.serializeV1(org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest, org.apache.flink.core.memory.DataOutputSerializer)> has parameter of type <org.apache.flink.core.memory.DataOutputSerializer> in (CompactorRequestSerializer.java:0) -Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestTypeInfo.createSerializer(org.apache.flink.api.common.serialization.SerializerConfig)> calls constructor <org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy.<init>(org.apache.flink.util.function.SerializableSupplier)> in (CompactorRequestTypeInfo.java:80) +Method <org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestTypeInfo.createSerializer(org.apache.flink.api.common.serialization.SerializerConfig)> calls constructor <org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy.<init>(org.apache.flink.util.function.SerializableSupplier)> in (CompactorRequestTypeInfo.java:81) Method <org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory.getNewBucket(java.lang.String, org.apache.flink.core.fs.Path, org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has generic parameter type <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<IN, java.lang.String>> with ty [...] Method <org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory.getNewBucket(java.lang.String, org.apache.flink.core.fs.Path, org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in (DefaultFileWriterBucketFactory [...] Method <org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory.getNewBucket(java.lang.String, org.apache.flink.core.fs.Path, org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in (DefaultFileWriterBucketFac [...] @@ -513,7 +513,7 @@ Method <org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapRe Method <org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader, org.apache.flink.connector.file.src.FileSourceSplit)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)> in (FileInfoExtractorBulkFormat.java:140) Method <org.apache.flink.connector.file.table.FileSystemCommitter.commitPartitionsWithFiles(java.util.Map)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)> in (FileSystemCommitter.java:146) Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in (FileSystemOutputFormat.java:0) -Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setStagingPath(org.apache.flink.core.fs.Path)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FileSystemOutputFormat.java:291) +Method <org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setStagingPath(org.apache.flink.core.fs.Path)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (FileSystemOutputFormat.java:0) Method <org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData, org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)> calls method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (FileSystemTableSink.java:553) Method <org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream, org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, boolean)> in (FileSystemTableSink.java:208) Method <org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream, org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()> in (FileSystemTableSink.java:189) @@ -589,7 +589,7 @@ Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.commitPa Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (PartitionCommitter.java:140) Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.PartitionCommitInfo>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (PartitionCommitter.java:0) Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (PartitionCommitter.java:0) -Method <org.apache.flink.connector.file.table.stream.PartitionTimeCommitPredicate.isPartitionCommittable(org.apache.flink.connector.file.table.stream.PartitionCommitPredicate$PredicateContext)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues(org.apache.flink.core.fs.Path)> in (PartitionTimeCommitPredicate.java:71) +Method <org.apache.flink.connector.file.table.stream.PartitionTimeCommitPredicate.isPartitionCommittable(org.apache.flink.connector.file.table.stream.PartitionCommitPredicate$PredicateContext)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues(org.apache.flink.core.fs.Path)> in (PartitionTimeCommitPredicate.java:70) Method <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger$1.currentProcTime()> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (ProcTimeCommitTrigger.java:111) Method <org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.addPartition(java.lang.String)> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (ProcTimeCommitTrigger.java:76) Method <org.apache.flink.connector.file.table.stream.StreamingFileWriter.closePartFileForPartitions()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.closePartFileForBucket(java.lang.Object)> in (StreamingFileWriter.java:130) @@ -607,33 +607,33 @@ Method <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter Method <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.commit()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.closeForCommit()> in (CompactBucketWriter.java:49) Method <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.factory(org.apache.flink.util.function.SupplierWithException)> has generic parameter type <org.apache.flink.util.function.SupplierWithException<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<T, java.lang.String>, java.io.IOException>> with type argument depending on <org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in (CompactBucketWriter.java:0) Method <org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.write(java.lang.Object)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.write(java.lang.Object, long)> in (CompactBucketWriter.java:44) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long, java.util.Map)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactCoordinator.java:190) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long, java.util.Map)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactCoordinator.java:198) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:116) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:116) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long, java.util.Map)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactCoordinator.java:192) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long, java.util.Map)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactCoordinator.java:200) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:118) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactCoordinator.java:118) Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class, org.apache.flink.api.common.serialization.SerializerConfig)> in (CompactCoordinator.java:118) Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (CompactCoordinator.java:107) Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in (CompactCoordinator.java:107) -Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactCoordinator.java:127) +Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactCoordinator.java:129) Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.compact.CompactMessages$CoordinatorInput>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (CompactCoordinator.java:0) Method <org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (CompactCoordinator.java:0) Method <org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.commitUpToCheckpoint(long)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactFileWriter.java:63) Method <org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.commitUpToCheckpoint(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactFileWriter.java:62) Method <org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.commitUpToCheckpoint(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactFileWriter.java:63) Method <org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.onPartFileOpened(java.lang.String, org.apache.flink.core.fs.Path)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactFileWriter.java:52) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactOperator.java:162) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactOperator.java:160) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactOperator.java:161) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:109) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:109) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls constructor <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)> in (CompactOperator.java:164) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactOperator.java:162) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactOperator.java:163) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:111) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (CompactOperator.java:111) Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class, org.apache.flink.api.common.serialization.SerializerConfig)> in (CompactOperator.java:111) Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (CompactOperator.java:103) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()> in (CompactOperator.java:139) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo.getConfiguration()> in (CompactOperator.java:140) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactOperator.java:125) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactOperator.java:126) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactOperator.java:121) -Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.tasks.StreamTask.getEnvironment()> in (CompactOperator.java:138) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()> in (CompactOperator.java:141) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo.getConfiguration()> in (CompactOperator.java:142) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactOperator.java:127) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (CompactOperator.java:128) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (CompactOperator.java:123) +Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.tasks.StreamTask.getEnvironment()> in (CompactOperator.java:140) Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.compact.CompactMessages$CoordinatorOutput>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (CompactOperator.java:0) Method <org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (CompactOperator.java:0) Method <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.invoke(org.apache.flink.table.data.RowData, org.apache.flink.streaming.api.functions.sink.SinkFunction$Context)> calls method <org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(java.lang.Object)> in (PrintTableSinkFactory.java:189) @@ -641,7 +641,7 @@ Method <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrin Method <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.open(org.apache.flink.api.common.functions.OpenContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (PrintTableSinkFactory.java:181) Method <org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.open(org.apache.flink.api.common.functions.OpenContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (PrintTableSinkFactory.java:182) Static Initializer <org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.INSTANCE> in (CompactCoordinator.java:67) -Static Initializer <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.INSTANCE> in (CompactorOperator.java:85) +Static Initializer <org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.INSTANCE> in (CompactorOperator.java:83) Static Initializer <org.apache.flink.connector.file.src.FileSourceSplitSerializer.<clinit>()> calls constructor <org.apache.flink.core.memory.DataOutputSerializer.<init>(int)> in (FileSourceSplitSerializer.java:42) Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.Bzip2InputStreamFactory.getInstance()> in (StandardDeCompressors.java:45) Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory.getInstance()> in (StandardDeCompressors.java:43) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java index cf9f06252f5..0cb573d466f 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java @@ -46,8 +46,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -138,15 +136,15 @@ public class CompactorOperator @Override public void endInput() throws Exception { // add collecting requests into the final snapshot - checkpointRequests.put(Long.MAX_VALUE, collectingRequests); + checkpointRequests.put(CommittableMessage.EOI, collectingRequests); collectingRequests = new ArrayList<>(); // submit all requests and wait until they are done - submitUntil(Long.MAX_VALUE); + submitUntil(CommittableMessage.EOI); assert checkpointRequests.isEmpty(); getAllTasksFuture().join(); - emitCompacted(null); + emitCompacted(CommittableMessage.EOI); assert compactingRequests.isEmpty(); } @@ -223,7 +221,7 @@ public class CompactorOperator canSubmit.clear(); } - private void emitCompacted(@Nullable Long checkpointId) throws Exception { + private void emitCompacted(long checkpointId) throws Exception { List<FileSinkCommittable> compacted = new ArrayList<>(); Iterator<Tuple2<CompactorRequest, CompletableFuture<Iterable<FileSinkCommittable>>>> iter = compactingRequests.iterator(); diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java index 7f8108db8ff..02bcbbdcac7 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java @@ -171,7 +171,7 @@ public class CompactorOperatorStateHandler new CommittableSummary<>( summary.getSubtaskId(), summary.getNumberOfSubtasks(), - getCheckpointId(summary), + summary.getCheckpointIdOrEOI(), summary.getNumberOfCommittables() + results.size(), summary.getNumberOfPendingCommittables() + results.size(), summary.getNumberOfFailedCommittables()))); @@ -180,7 +180,7 @@ public class CompactorOperatorStateHandler new StreamRecord<>( new CommittableWithLineage<>( committable, - getCheckpointId(summary), + summary.getCheckpointIdOrEOI(), summary.getSubtaskId()))); } } @@ -204,7 +204,7 @@ public class CompactorOperatorStateHandler // cleanup request to the next summary, since the count of pending committable // for this checkpoint is immutable now Iterable<FileSinkCommittable> result = submit(request).get(); - Long checkpointId = getCheckpointId(message); + Long checkpointId = message.getCheckpointIdOrEOI(); boolean pendingFileSent = false; for (FileSinkCommittable c : result) { if (c.hasPendingFile()) { @@ -275,10 +275,6 @@ public class CompactorOperatorStateHandler remainingRequestsState.update(Collections.singletonList(requestsMap)); } - private Long getCheckpointId(CommittableMessage<FileSinkCommittable> message) { - return message.getCheckpointId().isPresent() ? message.getCheckpointId().getAsLong() : null; - } - private CompletableFuture<Iterable<FileSinkCommittable>> submit(CompactorRequest request) { CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new CompletableFuture<>(); compactService.submit(request, resultFuture); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java index 754b5e374f4..7db0c29ecc6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java @@ -22,15 +22,33 @@ import org.apache.flink.annotation.Experimental; import java.util.OptionalLong; -/** The message send from {@link SinkWriter} to {@link Committer}. */ +/** The message send from {@code SinkWriter} to {@code Committer}. */ @Experimental public interface CommittableMessage<CommT> { + /** + * Special value for checkpointId for the end of input in case of batch commit or final + * checkpoint. + */ + long EOI = Long.MAX_VALUE; + /** The subtask that created this committable. */ int getSubtaskId(); /** * Returns the checkpoint id or empty if the message does not belong to a checkpoint. In that * case, the committable was created at the end of input (e.g., in batch mode). + * + * @see #getCheckpointIdOrEOI() + */ + @Deprecated + default OptionalLong getCheckpointId() { + long checkpointIdOrEOI = getCheckpointIdOrEOI(); + return checkpointIdOrEOI == EOI ? OptionalLong.empty() : OptionalLong.of(checkpointIdOrEOI); + } + + /** + * Returns the checkpoint id or EOI if this message belong to the final checkpoint or the batch + * commit. */ - OptionalLong getCheckpointId(); + long getCheckpointIdOrEOI(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializer.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializer.java index 0fac7380dd9..cf251dd2d84 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializer.java @@ -42,7 +42,6 @@ public class CommittableMessageSerializer<CommT> @VisibleForTesting static final int VERSION = 1; private static final int COMMITTABLE = 1; private static final int SUMMARY = 2; - private static final long EOI = Long.MAX_VALUE; private final SimpleVersionedSerializer<CommT> committableSerializer; @@ -64,14 +63,14 @@ public class CommittableMessageSerializer<CommT> committableSerializer, ((CommittableWithLineage<CommT>) obj).getCommittable(), out); - writeCheckpointId(out, obj); + out.writeLong(obj.getCheckpointIdOrEOI()); out.writeInt(obj.getSubtaskId()); } else if (obj instanceof CommittableSummary) { out.writeByte(SUMMARY); out.writeInt(obj.getSubtaskId()); CommittableSummary<?> committableSummary = (CommittableSummary<?>) obj; out.writeInt(committableSummary.getNumberOfSubtasks()); - writeCheckpointId(out, obj); + out.writeLong(obj.getCheckpointIdOrEOI()); out.writeInt(committableSummary.getNumberOfCommittables()); out.writeInt(committableSummary.getNumberOfPendingCommittables()); out.writeInt(committableSummary.getNumberOfFailedCommittables()); @@ -91,13 +90,13 @@ public class CommittableMessageSerializer<CommT> return new CommittableWithLineage<>( SimpleVersionedSerialization.readVersionAndDeSerialize( committableSerializer, in), - readCheckpointId(in), + in.readLong(), in.readInt()); case SUMMARY: return new CommittableSummary<>( in.readInt(), in.readInt(), - readCheckpointId(in), + in.readLong(), in.readInt(), in.readInt(), in.readInt()); @@ -109,14 +108,4 @@ public class CommittableMessageSerializer<CommT> + StringUtils.byteToHexString(serialized)); } } - - private void writeCheckpointId(DataOutputSerializer out, CommittableMessage<CommT> obj) - throws IOException { - out.writeLong(obj.getCheckpointId().orElse(EOI)); - } - - private Long readCheckpointId(DataInputDeserializer in) throws IOException { - long checkpointId = in.readLong(); - return checkpointId == EOI ? null : checkpointId; - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfo.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfo.java index ad8460ee9eb..c22868a7cf7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfo.java @@ -50,7 +50,6 @@ public class CommittableMessageTypeInfo<CommT> extends TypeInformation<Committab * @param committableSerializerFactory factory to create the serializer for a {@link * CommittableMessage} * @param <CommT> type of the committable - * @return */ public static <CommT> TypeInformation<CommittableMessage<CommT>> of( SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializerFactory) { @@ -86,6 +85,7 @@ public class CommittableMessageTypeInfo<CommT> extends TypeInformation<Committab return 1; } + @SuppressWarnings({"unchecked", "rawtypes"}) @Override public Class<CommittableMessage<CommT>> getTypeClass() { return (Class) CommittableMessage.class; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java index 7171a5168a7..7f1fb003806 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java @@ -20,9 +20,7 @@ package org.apache.flink.streaming.api.connector.sink2; import org.apache.flink.annotation.Experimental; -import javax.annotation.Nullable; - -import java.util.OptionalLong; +import java.util.Objects; /** * This class tracks the information about committables belonging to one checkpoint coming from one @@ -38,7 +36,7 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> { /** May change after recovery. */ private final int numberOfSubtasks; - @Nullable private final Long checkpointId; + private final long checkpointId; /** The number of committables coming from the given subtask in the particular checkpoint. */ private final int numberOfCommittables; /** The number of committables that have not been successfully committed. */ @@ -49,7 +47,7 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> { public CommittableSummary( int subtaskId, int numberOfSubtasks, - @Nullable Long checkpointId, + long checkpointId, int numberOfCommittables, int numberOfPendingCommittables, int numberOfFailedCommittables) { @@ -69,8 +67,8 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> { return numberOfSubtasks; } - public OptionalLong getCheckpointId() { - return checkpointId == null ? OptionalLong.empty() : OptionalLong.of(checkpointId); + public long getCheckpointIdOrEOI() { + return checkpointId; } public int getNumberOfCommittables() { @@ -94,4 +92,50 @@ public class CommittableSummary<CommT> implements CommittableMessage<CommT> { numberOfPendingCommittables, numberOfFailedCommittables); } + + @Override + public String toString() { + return "CommittableSummary{" + + "subtaskId=" + + subtaskId + + ", numberOfSubtasks=" + + numberOfSubtasks + + ", checkpointId=" + + checkpointId + + ", numberOfCommittables=" + + numberOfCommittables + + ", numberOfPendingCommittables=" + + numberOfPendingCommittables + + ", numberOfFailedCommittables=" + + numberOfFailedCommittables + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CommittableSummary<?> that = (CommittableSummary<?>) o; + return subtaskId == that.subtaskId + && numberOfSubtasks == that.numberOfSubtasks + && checkpointId == that.checkpointId + && numberOfCommittables == that.numberOfCommittables + && numberOfPendingCommittables == that.numberOfPendingCommittables + && numberOfFailedCommittables == that.numberOfFailedCommittables; + } + + @Override + public int hashCode() { + return Objects.hash( + subtaskId, + numberOfSubtasks, + checkpointId, + numberOfCommittables, + numberOfPendingCommittables, + numberOfFailedCommittables); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java index 96683bfd2ca..bdfe9de262f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java @@ -21,9 +21,7 @@ package org.apache.flink.streaming.api.connector.sink2; import org.apache.flink.annotation.Experimental; import org.apache.flink.api.connector.sink2.Committer; -import javax.annotation.Nullable; - -import java.util.OptionalLong; +import java.util.Objects; import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -35,10 +33,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Experimental public class CommittableWithLineage<CommT> implements CommittableMessage<CommT> { private final CommT committable; - @Nullable private final Long checkpointId; + private final long checkpointId; private final int subtaskId; - public CommittableWithLineage(CommT committable, @Nullable Long checkpointId, int subtaskId) { + public CommittableWithLineage(CommT committable, long checkpointId, int subtaskId) { this.committable = checkNotNull(committable); this.checkpointId = checkpointId; this.subtaskId = subtaskId; @@ -52,11 +50,42 @@ public class CommittableWithLineage<CommT> implements CommittableMessage<CommT> return subtaskId; } - public OptionalLong getCheckpointId() { - return checkpointId == null ? OptionalLong.empty() : OptionalLong.of(checkpointId); + public long getCheckpointIdOrEOI() { + return checkpointId; } public <NewCommT> CommittableWithLineage<NewCommT> map(Function<CommT, NewCommT> mapper) { return new CommittableWithLineage<>(mapper.apply(committable), checkpointId, subtaskId); } + + @Override + public String toString() { + return "CommittableWithLineage{" + + "committable=" + + committable + + ", checkpointId=" + + checkpointId + + ", subtaskId=" + + subtaskId + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CommittableWithLineage<?> that = (CommittableWithLineage<?>) o; + return checkpointId == that.checkpointId + && subtaskId == that.subtaskId + && Objects.equals(committable, that.committable); + } + + @Override + public int hashCode() { + return Objects.hash(committable, checkpointId, subtaskId); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 028d8317d80..484d7a712c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -50,6 +50,7 @@ import java.util.Collection; import java.util.Collections; import java.util.OptionalLong; +import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.util.IOUtils.closeAll; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -148,7 +149,7 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage endInput = true; if (!isCheckpointingEnabled || isBatchMode) { // There will be no final checkpoint, all committables should be committed here - notifyCheckpointComplete(Long.MAX_VALUE); + notifyCheckpointComplete(EOI); } } @@ -208,8 +209,8 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage // in case of unaligned checkpoint, we may receive notifyCheckpointComplete before the // committables - OptionalLong checkpointId = element.getValue().getCheckpointId(); - if (checkpointId.isPresent() && checkpointId.getAsLong() <= lastCompletedCheckpointId) { + long checkpointId = element.getValue().getCheckpointIdOrEOI(); + if (checkpointId <= lastCompletedCheckpointId) { commitAndEmitCheckpoints(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index ea9a60d6133..93f5ba66198 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -197,10 +197,10 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab public void endInput() throws Exception { endOfInput = true; sinkWriter.flush(true); - emitCommittables(Long.MAX_VALUE); + emitCommittables(CommittableMessage.EOI); } - private void emitCommittables(Long checkpointId) throws IOException, InterruptedException { + private void emitCommittables(long checkpointId) throws IOException, InterruptedException { if (!emitDownstream) { // To support SinkV1 topologies with only a writer we have to call prepareCommit // although no committables are forwarded diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java index 38abd2d3a33..7740a87da42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java @@ -23,14 +23,11 @@ import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import javax.annotation.Nullable; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.Objects; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; @@ -40,7 +37,7 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa /** Mapping of subtask id to {@link SubtaskCommittableManager}. */ private final Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers; - @Nullable private final Long checkpointId; + private final long checkpointId; private final int subtaskId; private final int numberOfSubtasks; private final SinkCommitterMetricGroup metricGroup; @@ -48,7 +45,7 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa CheckpointCommittableManagerImpl( int subtaskId, int numberOfSubtasks, - @Nullable Long checkpointId, + long checkpointId, SinkCommitterMetricGroup metricGroup) { this(new HashMap<>(), subtaskId, numberOfSubtasks, checkpointId, metricGroup); } @@ -57,7 +54,7 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers, int subtaskId, int numberOfSubtasks, - @Nullable Long checkpointId, + long checkpointId, SinkCommitterMetricGroup metricGroup) { this.subtasksCommittableManagers = checkNotNull(subtasksCommittableManagers); this.subtaskId = subtaskId; @@ -68,7 +65,6 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa @Override public long getCheckpointId() { - checkNotNull(checkpointId); return checkpointId; } @@ -77,16 +73,14 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa } void upsertSummary(CommittableSummary<CommT> summary) { + SubtaskCommittableManager<CommT> manager = + new SubtaskCommittableManager<>( + summary.getNumberOfCommittables(), + subtaskId, + summary.getCheckpointIdOrEOI(), + metricGroup); SubtaskCommittableManager<CommT> existing = - subtasksCommittableManagers.putIfAbsent( - summary.getSubtaskId(), - new SubtaskCommittableManager<>( - summary.getNumberOfCommittables(), - subtaskId, - summary.getCheckpointId().isPresent() - ? summary.getCheckpointId().getAsLong() - : null, - metricGroup)); + subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager); if (existing != null) { throw new UnsupportedOperationException( "Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920"); @@ -152,7 +146,7 @@ class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableMa } CheckpointCommittableManagerImpl<CommT> merge(CheckpointCommittableManagerImpl<CommT> other) { - checkArgument(Objects.equals(other.checkpointId, checkpointId)); + checkArgument(other.checkpointId == checkpointId); for (Map.Entry<Integer, SubtaskCommittableManager<CommT>> subtaskEntry : other.subtasksCommittableManagers.entrySet()) { subtasksCommittableManagers.merge( diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java index 1b3d5bffc81..801c8446850 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.runtime.operators.sink.committables; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.connector.sink2.Sink.InitContext; +import org.apache.flink.api.connector.sink2.InitContext; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; @@ -157,8 +157,7 @@ public class CommittableCollector<CommT> { } /** - * Returns {@link CheckpointCommittableManager} that is currently hold by the collector and - * associated with the {@link CommittableCollector#EOI} checkpoint id. + * Returns {@link CommittableManager} belonging to the last input. * * @return {@link CheckpointCommittableManager} */ @@ -235,12 +234,12 @@ public class CommittableCollector<CommT> { private void addSummary(CommittableSummary<CommT> summary) { checkpointCommittables .computeIfAbsent( - summary.getCheckpointId().orElse(EOI), + summary.getCheckpointIdOrEOI(), key -> new CheckpointCommittableManagerImpl<>( subtaskId, numberOfSubtasks, - summary.getCheckpointId().orElse(EOI), + summary.getCheckpointIdOrEOI(), metricGroup)) .upsertSummary(summary); } @@ -252,7 +251,7 @@ public class CommittableCollector<CommT> { private CheckpointCommittableManagerImpl<CommT> getCheckpointCommittables( CommittableMessage<CommT> committable) { CheckpointCommittableManagerImpl<CommT> committables = - this.checkpointCommittables.get(committable.getCheckpointId().orElse(EOI)); + this.checkpointCommittables.get(committable.getCheckpointIdOrEOI()); return checkNotNull(committables, "Unknown checkpoint for %s", committable); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java index 1baadeb40a8..381cec977f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java @@ -22,8 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import javax.annotation.Nullable; - import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -42,7 +40,7 @@ import static org.apache.flink.util.Preconditions.checkState; class SubtaskCommittableManager<CommT> { private final Deque<CommitRequestImpl<CommT>> requests; private int numExpectedCommittables; - @Nullable private final Long checkpointId; + private final long checkpointId; private final int subtaskId; private int numDrained; private int numFailed; @@ -51,7 +49,7 @@ class SubtaskCommittableManager<CommT> { SubtaskCommittableManager( int numExpectedCommittables, int subtaskId, - @Nullable Long checkpointId, + long checkpointId, SinkCommitterMetricGroup metricGroup) { this( Collections.emptyList(), @@ -69,7 +67,7 @@ class SubtaskCommittableManager<CommT> { int numDrained, int numFailed, int subtaskId, - @Nullable Long checkpointId, + long checkpointId, SinkCommitterMetricGroup metricGroup) { this.checkpointId = checkpointId; this.subtaskId = subtaskId; @@ -179,8 +177,7 @@ class SubtaskCommittableManager<CommT> { } @VisibleForTesting - @Nullable - Long getCheckpointId() { + long getCheckpointId() { return checkpointId; } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java index b3850b41935..4c7b0d7096c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java @@ -40,7 +40,7 @@ class CommittableMessageSerializerTest { assertThat(message).isInstanceOf(CommittableWithLineage.class); final CommittableWithLineage<Integer> copy = (CommittableWithLineage<Integer>) message; assertThat(copy.getCommittable()).isEqualTo(1); - assertThat(copy.getCheckpointId()).isPresent().hasValue(2L); + assertThat(copy.getCheckpointIdOrEOI()).isEqualTo(2L); assertThat(copy.getSubtaskId()).isEqualTo(3); } @@ -56,7 +56,7 @@ class CommittableMessageSerializerTest { final CommittableSummary<Integer> copy = (CommittableSummary<Integer>) message; assertThat(copy.getSubtaskId()).isEqualTo(1); assertThat(copy.getNumberOfSubtasks()).isEqualTo(2); - assertThat(copy.getCheckpointId()).isPresent().hasValue(3L); + assertThat(copy.getCheckpointIdOrEOI()).isEqualTo(3L); assertThat(copy.getNumberOfCommittables()).isEqualTo(4); assertThat(copy.getNumberOfPendingCommittables()).isEqualTo(5); assertThat(copy.getNumberOfFailedCommittables()).isEqualTo(6); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java index fb1062a5091..e02e838aeb3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java @@ -20,8 +20,6 @@ package org.apache.flink.streaming.api.connector.sink2; import org.assertj.core.api.AbstractAssert; -import javax.annotation.Nullable; - import static org.assertj.core.api.Assertions.assertThat; /** Custom assertions for {@link CommittableSummary}. */ @@ -75,13 +73,9 @@ public class CommittableSummaryAssert return this; } - public CommittableSummaryAssert hasCheckpointId(@Nullable Long checkpointId) { + public CommittableSummaryAssert hasCheckpointId(long checkpointId) { isNotNull(); - if (checkpointId == null) { - assertThat(actual.getCheckpointId()).isEmpty(); - } else { - assertThat(actual.getCheckpointId()).hasValue(checkpointId); - } + assertThat(actual.getCheckpointIdOrEOI()).isEqualTo(checkpointId); return this; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java index 966b4330c19..0937c7454d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java @@ -20,8 +20,6 @@ package org.apache.flink.streaming.api.connector.sink2; import org.assertj.core.api.AbstractAssert; -import javax.annotation.Nullable; - import static org.assertj.core.api.Assertions.assertThat; /** @@ -38,7 +36,8 @@ public class CommittableWithLinageAssert public CommittableWithLinageAssert isEqualTo(CommittableWithLineage<?> committableWithLineage) { isNotNull(); assertThat(actual.getSubtaskId()).isEqualTo(committableWithLineage.getSubtaskId()); - assertThat(actual.getCheckpointId()).isEqualTo(committableWithLineage.getCheckpointId()); + assertThat(actual.getCheckpointIdOrEOI()) + .isEqualTo(committableWithLineage.getCheckpointIdOrEOI()); assertThat(actual.getCommittable()).isEqualTo(committableWithLineage.getCommittable()); return this; } @@ -49,13 +48,9 @@ public class CommittableWithLinageAssert return this; } - public CommittableWithLinageAssert hasCheckpointId(@Nullable Long checkpointId) { + public CommittableWithLinageAssert hasCheckpointId(long checkpointId) { isNotNull(); - if (checkpointId == null) { - assertThat(actual.getCheckpointId()).isEmpty(); - } else { - assertThat(actual.getCheckpointId()).hasValue(checkpointId); - } + assertThat(actual.getCheckpointIdOrEOI()).isEqualTo(checkpointId); return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java index 441a2b437a0..dc45e939b1c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class GlobalCommitterOperatorTest { @@ -97,15 +98,15 @@ class GlobalCommitterOperatorTest { testHarness.open(); final CommittableSummary<Integer> committableSummary = - new CommittableSummary<>(1, 2, null, 1, 1, 0); + new CommittableSummary<>(1, 2, EOI, 1, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary)); final CommittableSummary<Integer> committableSummary2 = - new CommittableSummary<>(2, 2, null, 1, 1, 0); + new CommittableSummary<>(2, 2, EOI, 1, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary2)); - final CommittableWithLineage<Integer> first = new CommittableWithLineage<>(1, null, 1); + final CommittableWithLineage<Integer> first = new CommittableWithLineage<>(1, EOI, 1); testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage<Integer> second = new CommittableWithLineage<>(2, null, 2); + final CommittableWithLineage<Integer> second = new CommittableWithLineage<>(2, EOI, 2); testHarness.processElement(new StreamRecord<>(second)); testHarness.endInput(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java index 5af5efed1a9..3571e60f2c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions; import org.junit.jupiter.api.Test; +import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.assertj.core.api.Assertions.assertThat; class CommittableCollectorTest { @@ -50,13 +51,12 @@ class CommittableCollectorTest { void testGetEndOfInputCommittable() { final CommittableCollector<Integer> committableCollector = new CommittableCollector<>(1, 1, METRIC_GROUP); - CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, null, 1, 0, 0); + CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, EOI, 1, 0, 0); committableCollector.addMessage(first); CommittableManager<Integer> endOfInputCommittable = committableCollector.getEndOfInputCommittable(); assertThat(endOfInputCommittable).isNotNull(); - SinkV2Assertions.assertThat(endOfInputCommittable.getSummary()) - .hasCheckpointId(Long.MAX_VALUE); + SinkV2Assertions.assertThat(endOfInputCommittable.getSummary()).hasCheckpointId(EOI); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java index d1269ed89c1..6e7674b47a9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java @@ -35,6 +35,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.util.List; import java.util.function.IntSupplier; +import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput; import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableSummary; import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableWithLinage; @@ -171,15 +172,15 @@ abstract class CommitterOperatorTestBase { testHarness.open(); final CommittableSummary<String> committableSummary = - new CommittableSummary<>(1, 2, null, 1, 1, 0); + new CommittableSummary<>(1, 2, EOI, 1, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary)); final CommittableSummary<String> committableSummary2 = - new CommittableSummary<>(2, 2, null, 1, 1, 0); + new CommittableSummary<>(2, 2, EOI, 1, 1, 0); testHarness.processElement(new StreamRecord<>(committableSummary2)); - final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", null, 1); + final CommittableWithLineage<String> first = new CommittableWithLineage<>("1", EOI, 1); testHarness.processElement(new StreamRecord<>(first)); - final CommittableWithLineage<String> second = new CommittableWithLineage<>("1", null, 2); + final CommittableWithLineage<String> second = new CommittableWithLineage<>("1", EOI, 2); testHarness.processElement(new StreamRecord<>(second)); testHarness.endInput(); @@ -330,11 +331,7 @@ abstract class CommitterOperatorTestBase { CommittableWithLineage<?> copyCommittableWithDifferentOrigin( CommittableWithLineage<?> committable, int subtaskId) { return new CommittableWithLineage<>( - committable.getCommittable(), - committable.getCheckpointId().isPresent() - ? committable.getCheckpointId().getAsLong() - : null, - subtaskId); + committable.getCommittable(), committable.getCheckpointIdOrEOI(), subtaskId); } private OneInputStreamOperatorTestHarness< diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java index ecd7a3e54f3..4ede8a6ee83 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java @@ -54,8 +54,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import javax.annotation.Nullable; - import java.io.IOException; import java.lang.reflect.Proxy; import java.util.ArrayList; @@ -71,6 +69,7 @@ import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Supplier; +import static org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI; import static org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput; import static org.assertj.core.api.Assertions.assertThat; @@ -178,7 +177,7 @@ abstract class SinkWriterOperatorTestBase { testHarness.processElement(1, 1); testHarness.endInput(); - assertBasicOutput(testHarness.getOutput(), 1, Long.MAX_VALUE); + assertBasicOutput(testHarness.getOutput(), 1, EOI); } @ParameterizedTest @@ -222,7 +221,7 @@ abstract class SinkWriterOperatorTestBase { restoredTestHarness.notifyOfCompletedCheckpoint(checkpointId); if (stateful) { - assertBasicOutput(restoredTestHarness.getOutput(), 2, Long.MAX_VALUE); + assertBasicOutput(restoredTestHarness.getOutput(), 2, EOI); } else { assertThat(fromOutput(restoredTestHarness.getOutput()).get(0).asRecord().getValue()) .isInstanceOf(CommittableSummary.class) @@ -565,9 +564,7 @@ abstract class SinkWriterOperatorTestBase { } private static void assertBasicOutput( - Collection<Object> queuedOutput, - int numberOfCommittables, - @Nullable Long checkpointId) { + Collection<Object> queuedOutput, int numberOfCommittables, long checkpointId) { List<StreamElement> output = fromOutput(queuedOutput); assertThat(output).hasSize(numberOfCommittables + 1); assertThat(output.get(0).asRecord().getValue())
