This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c56def03b1debc1c26759d57c017ae19b3a05eea
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Sep 5 13:21:40 2024 +0200

    [FLINK-25920] Straighten EOI handling in CommittableCollector
    
    In some parts of the sink, EOI is treated as checkpointId=null and in some 
checkpointId=MAX. The code of CheckpointCommittableManagerImpl implies that a 
null is valid however the serializer actually breaks then. In practice, 
checkpointId=MAX is used all the time by accident.
    
    This commit replaces the nullable checkpointIds with a primitive long 
EOI=MAX, so that we always use the special value instead of null. The 
serializer already used that value, so it actually simplifies many places and 
doesn't break any existing state.
---
 .../f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e           | 148 ++++++++++-----------
 .../sink/compactor/operator/CompactorOperator.java |  10 +-
 .../operator/CompactorOperatorStateHandler.java    |  10 +-
 .../api/connector/sink2/CommittableMessage.java    |  22 ++-
 .../sink2/CommittableMessageSerializer.java        |  19 +--
 .../sink2/CommittableMessageTypeInfo.java          |   2 +-
 .../api/connector/sink2/CommittableSummary.java    |  58 +++++++-
 .../connector/sink2/CommittableWithLineage.java    |  43 +++++-
 .../runtime/operators/sink/CommitterOperator.java  |   7 +-
 .../runtime/operators/sink/SinkWriterOperator.java |   4 +-
 .../CheckpointCommittableManagerImpl.java          |  28 ++--
 .../sink/committables/CommittableCollector.java    |  11 +-
 .../committables/SubtaskCommittableManager.java    |  11 +-
 .../sink2/CommittableMessageSerializerTest.java    |   4 +-
 .../connector/sink2/CommittableSummaryAssert.java  |  10 +-
 .../sink2/CommittableWithLinageAssert.java         |  13 +-
 .../sink2/GlobalCommitterOperatorTest.java         |   9 +-
 .../committables/CommittableCollectorTest.java     |   6 +-
 .../operators/sink/CommitterOperatorTestBase.java  |  15 +--
 .../operators/sink/SinkWriterOperatorTestBase.java |  11 +-
 20 files changed, 245 insertions(+), 196 deletions(-)

diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
index 107dc673280..91d8cab4e67 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/f7a4e6fa-e7de-48c9-a61e-c13e83f0c72e
@@ -54,9 +54,9 @@ Constructor 
<org.apache.flink.connector.datagen.table.types.DataGeneratorMapper.
 Constructor 
<org.apache.flink.connector.datagen.table.types.RowDataGenerator.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.util.List, float)> depends on component type 
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in 
(RowDataGenerator.java:0)
 Constructor 
<org.apache.flink.connector.datagen.table.types.RowDataGenerator.<init>([Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;,
 java.util.List, float)> has parameter of type 
<[Lorg.apache.flink.streaming.api.functions.source.datagen.DataGenerator;> in 
(RowDataGenerator.java:0)
 Constructor 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.<init>(org.apache.flink.core.fs.Path,
 long, org.apache.flink.api.common.serialization.BulkWriter$Factory, 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner, 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy,
 org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory, 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileCo [...]
-Constructor 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.<init>(org.apache.flink.core.fs.Path,
 org.apache.flink.api.common.serialization.BulkWriter$Factory, 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner)> calls 
method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()>
 in (FileSink.java:533)
+Constructor 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.<init>(org.apache.flink.core.fs.Path,
 org.apache.flink.api.common.serialization.BulkWriter$Factory, 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner)> calls 
method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()>
 in (FileSink.java:548)
 Constructor 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.<init>(org.apache.flink.core.fs.Path,
 long, org.apache.flink.api.common.serialization.Encoder, 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner, 
org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy, 
org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory, 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has 
parameter of type <org.apac [...]
-Constructor 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.<init>(org.apache.flink.core.fs.Path,
 org.apache.flink.api.common.serialization.Encoder, 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner)> calls 
method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()>
 in (FileSink.java:355)
+Constructor 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.<init>(org.apache.flink.core.fs.Path,
 org.apache.flink.api.common.serialization.Encoder, 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner)> calls 
method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()>
 in (FileSink.java:370)
 Constructor 
<org.apache.flink.connector.file.sink.FileSinkCommittable.<init>(java.lang.String,
 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable)>
 has parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable>
 in (FileSinkCommittable.java:0)
 Constructor 
<org.apache.flink.connector.file.sink.FileSinkCommittable.<init>(java.lang.String,
 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable)>
 has parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable>
 in (FileSinkCommittable.java:0)
 Constructor 
<org.apache.flink.connector.file.sink.FileSinkCommittable.<init>(java.lang.String,
 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable,
 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable,
 org.apache.flink.core.fs.Path)> has parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable>
 in (FileSinkCommittable.java:0)
@@ -100,7 +100,7 @@ Constructor 
<org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAss
 Constructor 
<org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner.<init>(java.util.Collection)>
 calls constructor <org.apache.flink.metrics.SimpleCounter.<init>()> in 
(LocalityAwareSplitAssigner.java:81)
 Constructor 
<org.apache.flink.connector.file.src.impl.ContinuousFileSplitEnumerator.<init>(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 org.apache.flink.connector.file.src.enumerate.FileEnumerator, 
org.apache.flink.connector.file.src.assigners.FileSplitAssigner, 
[Lorg.apache.flink.core.fs.Path;, java.util.Collection, long)> has parameter of 
type <[Lorg.apache.flink.core.fs.Path;> in 
(ContinuousFileSplitEnumerator.java:0)
 Constructor 
<org.apache.flink.connector.file.table.ColumnarRowIterator.<init>(org.apache.flink.table.data.columnar.ColumnarRowData,
 java.lang.Runnable)> has parameter of type 
<org.apache.flink.table.data.columnar.ColumnarRowData> in 
(ColumnarRowIterator.java:0)
-Constructor 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.<init>()> 
calls constructor 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.<init>(java.lang.String,
 java.lang.String)> in (FileSystemOutputFormat.java:216)
+Constructor 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.<init>()> 
calls constructor 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.<init>(java.lang.String,
 java.lang.String)> in (FileSystemOutputFormat.java:235)
 Constructor 
<org.apache.flink.connector.file.table.FileSystemOutputFormat.<init>(org.apache.flink.connector.file.table.FileSystemFactory,
 org.apache.flink.connector.file.table.TableMetaStoreFactory, boolean, boolean, 
org.apache.flink.core.fs.Path, [Ljava.lang.String;, boolean, 
java.util.LinkedHashMap, 
org.apache.flink.connector.file.table.OutputFormatFactory, 
org.apache.flink.connector.file.table.PartitionComputer, 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig [...]
 Constructor 
<org.apache.flink.connector.file.table.FileSystemOutputFormat.<init>(org.apache.flink.connector.file.table.FileSystemFactory,
 org.apache.flink.connector.file.table.TableMetaStoreFactory, boolean, boolean, 
org.apache.flink.core.fs.Path, [Ljava.lang.String;, boolean, 
java.util.LinkedHashMap, 
org.apache.flink.connector.file.table.OutputFormatFactory, 
org.apache.flink.connector.file.table.PartitionComputer, 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig [...]
 Constructor 
<org.apache.flink.connector.file.table.PartitionTempFileManager.<init>(org.apache.flink.connector.file.table.FileSystemFactory,
 org.apache.flink.core.fs.Path, int, int)> calls constructor 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.<init>(java.lang.String,
 java.lang.String)> in (PartitionTempFileManager.java:71)
@@ -174,8 +174,8 @@ Field 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.
 Field 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.writer>
 has type 
<org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter> 
in (CompactBucketWriter.java:0)
 Field 
<org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.writer>
 has type <org.apache.flink.api.common.functions.util.PrintSinkOutputWriter> in 
(PrintTableSinkFactory.java:0)
 Method 
<org.apache.flink.connector.base.source.hybrid.HybridSource$HybridSourceBuilder.addSource(org.apache.flink.connector.base.source.hybrid.HybridSource$SourceFactory,
 org.apache.flink.api.connector.source.Boundedness)> calls method 
<org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, 
org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in 
(HybridSource.java:246)
-Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 int, int, int)> calls method 
<org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits.signalIntermediateNoMoreSplits(int)>
 in (HybridSourceSplitEnumerator.java:448)
-Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 int, int, int)> checks instanceof 
<org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits> in 
(HybridSourceSplitEnumerator.java:440)
+Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 int, int, int)> calls method 
<org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits.signalIntermediateNoMoreSplits(int)>
 in (HybridSourceSplitEnumerator.java:450)
+Method 
<org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator.checkAndSignalNoMoreSplits(org.apache.flink.api.connector.source.SplitEnumeratorContext,
 int, int, int)> checks instanceof 
<org.apache.flink.api.connector.source.SupportsIntermediateNoMoreSplits> in 
(HybridSourceSplitEnumerator.java:442)
 Method 
<org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.getNumAliveFetchers()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(SplitFetcherManager.java:0)
 Method 
<org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue.take()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(FutureCompletingBlockingQueue.java:0)
 Method 
<org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction.getSerializer()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(FromElementsGeneratorFunction.java:0)
@@ -183,7 +183,7 @@ Method 
<org.apache.flink.connector.datagen.functions.IndexLookupGeneratorFunctio
 Method 
<org.apache.flink.connector.datagen.source.DataGeneratorSource.getGeneratorFunction()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(DataGeneratorSource.java:0)
 Method 
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.createReader(org.apache.flink.api.connector.source.SourceReaderContext)>
 calls constructor 
<org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader.<init>(org.apache.flink.api.connector.source.SourceReader,
 org.apache.flink.api.connector.source.util.ratelimit.RateLimiter)> in 
(GeneratorSourceReaderFactory.java:63)
 Method 
<org.apache.flink.connector.datagen.source.GeneratorSourceReaderFactory.createReader(org.apache.flink.api.connector.source.SourceReaderContext)>
 calls method 
<org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy.createRateLimiter(int)>
 in (GeneratorSourceReaderFactory.java:62)
-Method 
<org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> 
calls constructor 
<org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.<init>(org.apache.flink.streaming.api.functions.source.datagen.DataGenerator,
 long, java.lang.Long)> in (DataGenTableSource.java:67)
+Method 
<org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> 
calls constructor 
<org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.<init>(org.apache.flink.streaming.api.functions.source.datagen.DataGenerator,
 long, java.lang.Long)> in (DataGenTableSource.java:72)
 Method 
<org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> 
has return type 
<org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource> 
in (DataGenTableSource.java:0)
 Method 
<org.apache.flink.connector.datagen.table.DataGenTableSource.createSource()> is 
annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(DataGenTableSource.java:0)
 Method 
<org.apache.flink.connector.datagen.table.DataGeneratorContainer.getGenerator()>
 has return type 
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator> in 
(DataGeneratorContainer.java:0)
@@ -217,10 +217,10 @@ Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(or
 Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.TinyIntType)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)>
 in (RandomGeneratorVisitor.java:192)
 Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.VarBinaryType)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withVarLen(boolean)>
 in (RandomGeneratorVisitor.java:179)
 Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.VarCharType)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)>
 in (RandomGeneratorVisitor.java:158)
-Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.VarCharType)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withVarLen(boolean)>
 in (RandomGeneratorVisitor.java:172)
-Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.YearMonthIntervalType)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.intGenerator(int,
 int)> in (RandomGeneratorVisitor.java:292)
-Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.YearMonthIntervalType)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)>
 in (RandomGeneratorVisitor.java:293)
-Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.ZonedTimestampType)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)>
 in (RandomGeneratorVisitor.java:331)
+Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.VarCharType)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withVarLen(boolean)>
 in (RandomGeneratorVisitor.java:159)
+Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.YearMonthIntervalType)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.intGenerator(int,
 int)> in (RandomGeneratorVisitor.java:287)
+Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.YearMonthIntervalType)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)>
 in (RandomGeneratorVisitor.java:288)
+Method 
<org.apache.flink.connector.datagen.table.RandomGeneratorVisitor.visit(org.apache.flink.table.types.logical.ZonedTimestampType)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator.withNullRate(float)>
 in (RandomGeneratorVisitor.java:326)
 Method 
<org.apache.flink.connector.datagen.table.SequenceGeneratorVisitor$2.next()> 
calls method 
<org.apache.flink.shaded.guava32.com.google.common.primitives.Longs.toByteArray(long)>
 in (SequenceGeneratorVisitor.java:221)
 Method 
<org.apache.flink.connector.datagen.table.SequenceGeneratorVisitor.getSequenceBytesGenerator(long,
 long)> has return type 
<org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator> in 
(SequenceGeneratorVisitor.java:0)
 Method 
<org.apache.flink.connector.datagen.table.SequenceGeneratorVisitor.getSequenceStringGenerator(long,
 long)> has return type 
<org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator> in 
(SequenceGeneratorVisitor.java:0)
@@ -240,37 +240,37 @@ Method 
<org.apache.flink.connector.datagen.table.types.RowDataGenerator.next()>
 Method 
<org.apache.flink.connector.datagen.table.types.RowDataGenerator.open(java.lang.String,
 org.apache.flink.runtime.state.FunctionInitializationContext, 
org.apache.flink.api.common.functions.RuntimeContext)> calls method 
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator.open(java.lang.String,
 org.apache.flink.runtime.state.FunctionInitializationContext, 
org.apache.flink.api.common.functions.RuntimeContext)> in 
(RowDataGenerator.java:54)
 Method 
<org.apache.flink.connector.datagen.table.types.RowDataGenerator.snapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext)>
 calls method 
<org.apache.flink.streaming.api.functions.source.datagen.DataGenerator.snapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext)>
 in (RowDataGenerator.java:61)
 Method 
<org.apache.flink.connector.file.sink.FileSink$BucketsBuilder.createBucketWriter()>
 has return type 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in 
(FileSink.java:0)
-Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter()>
 calls constructor 
<org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.<init>(org.apache.flink.core.fs.RecoverableWriter,
 org.apache.flink.api.common.serialization.BulkWriter$Factory)> in 
(FileSink.java:674)
+Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter()>
 calls constructor 
<org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.<init>(org.apache.flink.core.fs.RecoverableWriter,
 org.apache.flink.api.common.serialization.BulkWriter$Factory)> in 
(FileSink.java:698)
 Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createBucketWriter()>
 has return type 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in 
(FileSink.java:0)
 Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.Sink$InitContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()>
 in (FileSink.java:638)
 Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.Sink$InitContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.getPartPrefix()>
 in (FileSink.java:639)
 Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.Sink$InitContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.getPartSuffix()>
 in (FileSink.java:640)
-Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:668)
-Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:669)
-Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()>
 in (FileSink.java:669)
-Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()>
 in (FileSink.java:668)
-Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:658)
-Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:659)
-Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()>
 in (FileSink.java:658)
-Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()>
 in (FileSink.java:659)
+Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:692)
+Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:693)
+Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()>
 in (FileSink.java:693)
+Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()>
 in (FileSink.java:692)
+Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:682)
+Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:683)
+Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()>
 in (FileSink.java:682)
+Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()>
 in (FileSink.java:683)
 Method 
<org.apache.flink.connector.file.sink.FileSink$BulkFormatBuilder.withOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)>
 has parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in 
(FileSink.java:0)
-Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter()>
 calls constructor 
<org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.<init>(org.apache.flink.core.fs.RecoverableWriter,
 org.apache.flink.api.common.serialization.Encoder)> in (FileSink.java:480)
+Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter()>
 calls constructor 
<org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.<init>(org.apache.flink.core.fs.RecoverableWriter,
 org.apache.flink.api.common.serialization.Encoder)> in (FileSink.java:495)
 Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter()>
 has return type 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in 
(FileSink.java:0)
 Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.WriterInitContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()>
 in (FileSink.java:435)
 Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.WriterInitContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.getPartPrefix()>
 in (FileSink.java:436)
 Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createWriter(org.apache.flink.api.connector.sink2.WriterInitContext)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.getPartSuffix()>
 in (FileSink.java:437)
-Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:474)
-Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:475)
-Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()>
 in (FileSink.java:475)
-Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()>
 in (FileSink.java:474)
-Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:464)
-Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:465)
-Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()>
 in (FileSink.java:464)
-Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()>
 in (FileSink.java:465)
+Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:489)
+Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:490)
+Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()>
 in (FileSink.java:490)
+Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()>
 in (FileSink.java:489)
+Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:479)
+Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter.getProperties()>
 in (FileSink.java:480)
+Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getInProgressFileRecoverableSerializer()>
 in (FileSink.java:479)
+Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getWriterStateSerializer()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties.getPendingFileRecoverableSerializer()>
 in (FileSink.java:480)
 Method 
<org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.withOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)>
 has parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in 
(FileSink.java:0)
-Method 
<org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, 
boolean)> in (FileSink.java:231)
-Method 
<org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, 
boolean)> in (FileSink.java:243)
-Method 
<org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, 
boolean)> in (FileSink.java:275)
+Method 
<org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, 
boolean)> in (FileSink.java:246)
+Method 
<org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, 
boolean)> in (FileSink.java:258)
+Method 
<org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, 
boolean)> in (FileSink.java:290)
 Method 
<org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 has generic parameter type 
<org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.file.sink.FileSinkCommittable>>>
 with type argument depending on 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in 
(FileSink.java:0)
 Method 
<org.apache.flink.connector.file.sink.FileSink.addPreCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)>
 has generic return type 
<org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.file.sink.FileSinkCommittable>>>
 with type argument depending on 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in 
(FileSink.java:0)
 Method 
<org.apache.flink.connector.file.sink.FileSinkCommittable.getInProgressFileToCleanup()>
 has return type 
<org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable>
 in (FileSinkCommittable.java:0)
@@ -330,26 +330,26 @@ Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactService.g
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactService.getWriterType(org.apache.flink.connector.file.sink.compactor.FileCompactor)>
 gets field 
<org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter$Type.RECORD_WISE>
 in (CompactService.java:169)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactService.getWriterType(org.apache.flink.connector.file.sink.compactor.FileCompactor)>
 has return type 
<org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter$Type>
 in (CompactService.java:0)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactService.open()> 
calls method <org.apache.flink.runtime.util.Hardware.getNumberCPUCores()> in 
(CompactService.java:70)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserialize(int,
 [B)> calls constructor 
<org.apache.flink.core.memory.DataInputDeserializer.<init>([B)> in 
(CompactorOperator.java:301)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserializeV1(org.apache.flink.core.memory.DataInputDeserializer)>
 calls method <org.apache.flink.core.memory.DataInputDeserializer.readInt()> in 
(CompactorOperator.java:325)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserializeV1(org.apache.flink.core.memory.DataInputDeserializer)>
 calls method <org.apache.flink.core.memory.DataInputDeserializer.readLong()> 
in (CompactorOperator.java:328)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserialize(int,
 [B)> calls constructor 
<org.apache.flink.core.memory.DataInputDeserializer.<init>([B)> in 
(CompactorOperator.java:299)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserializeV1(org.apache.flink.core.memory.DataInputDeserializer)>
 calls method <org.apache.flink.core.memory.DataInputDeserializer.readInt()> in 
(CompactorOperator.java:323)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserializeV1(org.apache.flink.core.memory.DataInputDeserializer)>
 calls method <org.apache.flink.core.memory.DataInputDeserializer.readLong()> 
in (CompactorOperator.java:326)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.deserializeV1(org.apache.flink.core.memory.DataInputDeserializer)>
 has parameter of type <org.apache.flink.core.memory.DataInputDeserializer> in 
(CompactorOperator.java:0)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)>
 calls constructor 
<org.apache.flink.core.memory.DataOutputSerializer.<init>(int)> in 
(CompactorOperator.java:292)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)>
 calls method 
<org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer()> in 
(CompactorOperator.java:295)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)>
 calls method <org.apache.flink.core.memory.DataOutputSerializer.writeInt(int)> 
in (CompactorOperator.java:293)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serializeV1(java.util.Map,
 org.apache.flink.core.memory.DataOutputSerializer)> calls method 
<org.apache.flink.core.memory.DataOutputSerializer.writeInt(int)> in 
(CompactorOperator.java:315)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serializeV1(java.util.Map,
 org.apache.flink.core.memory.DataOutputSerializer)> calls method 
<org.apache.flink.core.memory.DataOutputSerializer.writeLong(long)> in 
(CompactorOperator.java:317)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)>
 calls constructor 
<org.apache.flink.core.memory.DataOutputSerializer.<init>(int)> in 
(CompactorOperator.java:290)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)>
 calls method 
<org.apache.flink.core.memory.DataOutputSerializer.getCopyOfBuffer()> in 
(CompactorOperator.java:293)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serialize(java.util.Map)>
 calls method <org.apache.flink.core.memory.DataOutputSerializer.writeInt(int)> 
in (CompactorOperator.java:291)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serializeV1(java.util.Map,
 org.apache.flink.core.memory.DataOutputSerializer)> calls method 
<org.apache.flink.core.memory.DataOutputSerializer.writeInt(int)> in 
(CompactorOperator.java:313)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serializeV1(java.util.Map,
 org.apache.flink.core.memory.DataOutputSerializer)> calls method 
<org.apache.flink.core.memory.DataOutputSerializer.writeLong(long)> in 
(CompactorOperator.java:315)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator$RemainingRequestsSerializer.serializeV1(java.util.Map,
 org.apache.flink.core.memory.DataOutputSerializer)> has parameter of type 
<org.apache.flink.core.memory.DataOutputSerializer> in 
(CompactorOperator.java:0)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)>
 calls constructor 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary.<init>(int, 
int, java.lang.Long, int, int, int)> in (CompactorOperator.java:254)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)>
 calls constructor 
<org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object,
 java.lang.Long, int)> in (CompactorOperator.java:262)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactorOperator.java:256)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactorOperator.java:263)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactorOperator.java:250)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactorOperator.java:251)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(java.lang.Long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactorOperator.java:262)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)>
 calls constructor 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary.<init>(int, 
int, long, int, int, int)> in (CompactorOperator.java:252)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)>
 calls constructor 
<org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object,
 long, int)> in (CompactorOperator.java:260)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactorOperator.java:254)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactorOperator.java:261)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactorOperator.java:248)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactorOperator.java:249)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.emitCompacted(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactorOperator.java:260)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.getAllTasksFuture()>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(CompactorOperator.java:0)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.streaming.api.operators.util.SimpleVersionedListState.<init>(org.apache.flink.api.common.state.ListState,
 org.apache.flink.core.io.SimpleVersionedSerializer)> in 
(CompactorOperator.java:187)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(CompactorOperator.java:135)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.streaming.api.operators.util.SimpleVersionedListState.<init>(org.apache.flink.api.common.state.ListState,
 org.apache.flink.core.io.SimpleVersionedSerializer)> in 
(CompactorOperator.java:185)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(CompactorOperator.java:133)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has generic parameter type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest>>
 with type argument depending on 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(CompactorOperator.java:0)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has parameter of type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(CompactorOperator.java:0)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorFactory.createStreamOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters)>
 calls method 
<org.apache.flink.streaming.api.graph.StreamConfig.getOperatorName()> in 
(CompactorOperatorFactory.java:84)
@@ -361,10 +361,12 @@ Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperato
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorFactory.createStreamOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters)>
 has generic parameter type 
<org.apache.flink.streaming.api.operators.StreamOperatorParameters<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.file.sink.FileSinkCommittable>>>
 with type argument depending on 
<org.apache.flink.streaming.api.operators.StreamOperatorParame [...]
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorFactory.createStreamOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters)>
 has parameter of type 
<org.apache.flink.streaming.api.operators.StreamOperatorParameters> in 
(CompactorOperatorFactory.java:0)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorFactory.createStreamOperator(org.apache.flink.streaming.api.operators.StreamOperatorParameters)>
 has type parameter 'T' depending on 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in 
(CompactorOperatorFactory.java:0)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls constructor 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary.<init>(int, 
int, java.lang.Long, int, int, int)> in (CompactorOperatorStateHandler.java:177)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls constructor 
<org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object,
 java.lang.Long, int)> in (CompactorOperatorStateHandler.java:184)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls constructor 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary.<init>(int, 
int, long, int, int, int)> in (CompactorOperatorStateHandler.java:177)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls constructor 
<org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object,
 long, int)> in (CompactorOperatorStateHandler.java:184)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactorOperatorStateHandler.java:177)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactorOperatorStateHandler.java:184)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls method 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getCheckpointIdOrEOI()>
 in (CompactorOperatorStateHandler.java:174)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls method 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getCheckpointIdOrEOI()>
 in (CompactorOperatorStateHandler.java:183)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls method 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getNumberOfCommittables()>
 in (CompactorOperatorStateHandler.java:175)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls method 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getNumberOfFailedCommittables()>
 in (CompactorOperatorStateHandler.java:177)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls method 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getNumberOfPendingCommittables()>
 in (CompactorOperatorStateHandler.java:176)
@@ -373,11 +375,9 @@ Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperato
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 calls method 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary.getSubtaskId()>
 in (CompactorOperatorStateHandler.java:184)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 has generic parameter type 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary<org.apache.flink.connector.file.sink.FileSinkCommittable>>
 with type argument depending on 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary> in 
(CompactorOperatorStateHandler.java:0)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.appendCompactingResultsToSummary(org.apache.flink.streaming.api.connector.sink2.CommittableSummary)>
 has parameter of type 
<org.apache.flink.streaming.api.connector.sink2.CommittableSummary> in 
(CompactorOperatorStateHandler.java:0)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.getCheckpointId(org.apache.flink.streaming.api.connector.sink2.CommittableMessage)>
 calls method 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessage.getCheckpointId()>
 in (CompactorOperatorStateHandler.java:279)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.getCheckpointId(org.apache.flink.streaming.api.connector.sink2.CommittableMessage)>
 has generic parameter type 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.file.sink.FileSinkCommittable>>
 with type argument depending on 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in 
(CompactorOperatorStateHandler.java:0)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.getCheckpointId(org.apache.flink.streaming.api.connector.sink2.CommittableMessage)>
 has parameter of type 
<org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in 
(CompactorOperatorStateHandler.java:0)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)>
 calls constructor 
<org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object,
 java.lang.Long, int)> in (CompactorOperatorStateHandler.java:218)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)>
 calls constructor 
<org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.<init>(java.lang.Object,
 long, int)> in (CompactorOperatorStateHandler.java:218)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactorOperatorStateHandler.java:218)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)>
 calls method 
<org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.getCheckpointIdOrEOI()>
 in (CompactorOperatorStateHandler.java:207)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)>
 calls method 
<org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.getCommittable()>
 in (CompactorOperatorStateHandler.java:196)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)>
 calls method 
<org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage.getSubtaskId()>
 in (CompactorOperatorStateHandler.java:218)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler.handleHiddenCommittable(org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage)>
 has generic parameter type 
<org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage<org.apache.flink.connector.file.sink.FileSinkCommittable>>
 with type argument depending on 
<org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage> in 
(CompactorOperatorStateHandler.java:0)
@@ -417,7 +417,7 @@ Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestSerializer.serialize(org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest)>
 calls method <org.apache.flink.core.memory.DataOutputSerializer.writeInt(int)> 
in (CompactorRequestSerializer.java:53)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestSerializer.serializeV1(org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest,
 org.apache.flink.core.memory.DataOutputSerializer)> calls method 
<org.apache.flink.core.memory.DataOutputSerializer.writeUTF(java.lang.String)> 
in (CompactorRequestSerializer.java:73)
 Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestSerializer.serializeV1(org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest,
 org.apache.flink.core.memory.DataOutputSerializer)> has parameter of type 
<org.apache.flink.core.memory.DataOutputSerializer> in 
(CompactorRequestSerializer.java:0)
-Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestTypeInfo.createSerializer(org.apache.flink.api.common.serialization.SerializerConfig)>
 calls constructor 
<org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy.<init>(org.apache.flink.util.function.SerializableSupplier)>
 in (CompactorRequestTypeInfo.java:80)
+Method 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestTypeInfo.createSerializer(org.apache.flink.api.common.serialization.SerializerConfig)>
 calls constructor 
<org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy.<init>(org.apache.flink.util.function.SerializableSupplier)>
 in (CompactorRequestTypeInfo.java:81)
 Method 
<org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory.getNewBucket(java.lang.String,
 org.apache.flink.core.fs.Path, 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter, 
org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy, 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has 
generic parameter type 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<IN, 
java.lang.String>> with ty [...]
 Method 
<org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory.getNewBucket(java.lang.String,
 org.apache.flink.core.fs.Path, 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter, 
org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy, 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has 
parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in 
(DefaultFileWriterBucketFactory [...]
 Method 
<org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory.getNewBucket(java.lang.String,
 org.apache.flink.core.fs.Path, 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter, 
org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy, 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)> has 
parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in 
(DefaultFileWriterBucketFac [...]
@@ -513,7 +513,7 @@ Method 
<org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapRe
 Method 
<org.apache.flink.connector.file.table.FileInfoExtractorBulkFormat.wrapReader(org.apache.flink.connector.file.src.reader.BulkFormat$Reader,
 org.apache.flink.connector.file.src.FileSourceSplit)> calls method 
<org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)>
 in (FileInfoExtractorBulkFormat.java:140)
 Method 
<org.apache.flink.connector.file.table.FileSystemCommitter.commitPartitionsWithFiles(java.util.Map)>
 calls method 
<org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)>
 in (FileSystemCommitter.java:146)
 Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setOutputFileConfig(org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)>
 has parameter of type 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig> in 
(FileSystemOutputFormat.java:0)
-Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setStagingPath(org.apache.flink.core.fs.Path)>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(FileSystemOutputFormat.java:291)
+Method 
<org.apache.flink.connector.file.table.FileSystemOutputFormat$Builder.setStagingPath(org.apache.flink.core.fs.Path)>
 is annotated with <org.apache.flink.annotation.VisibleForTesting> in 
(FileSystemOutputFormat.java:0)
 Method 
<org.apache.flink.connector.file.table.FileSystemTableSink$TableBucketAssigner.getBucketId(org.apache.flink.table.data.RowData,
 
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner$Context)>
 calls method 
<org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)>
 in (FileSystemTableSink.java:553)
 Method 
<org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream,
 org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> 
calls method <org.apache.flink.api.dag.Transformation.setParallelism(int, 
boolean)> in (FileSystemTableSink.java:208)
 Method 
<org.apache.flink.connector.file.table.FileSystemTableSink.createBatchSink(org.apache.flink.streaming.api.datastream.DataStream,
 org.apache.flink.table.connector.sink.DynamicTableSink$Context, int, boolean)> 
calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig.builder()>
 in (FileSystemTableSink.java:189)
@@ -589,7 +589,7 @@ Method 
<org.apache.flink.connector.file.table.stream.PartitionCommitter.commitPa
 Method 
<org.apache.flink.connector.file.table.stream.PartitionCommitter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(PartitionCommitter.java:140)
 Method 
<org.apache.flink.connector.file.table.stream.PartitionCommitter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has generic parameter type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.PartitionCommitInfo>>
 with type argument depending on 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(PartitionCommitter.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.PartitionCommitter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has parameter of type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(PartitionCommitter.java:0)
-Method 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitPredicate.isPartitionCommittable(org.apache.flink.connector.file.table.stream.PartitionCommitPredicate$PredicateContext)>
 calls method 
<org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues(org.apache.flink.core.fs.Path)>
 in (PartitionTimeCommitPredicate.java:71)
+Method 
<org.apache.flink.connector.file.table.stream.PartitionTimeCommitPredicate.isPartitionCommittable(org.apache.flink.connector.file.table.stream.PartitionCommitPredicate$PredicateContext)>
 calls method 
<org.apache.flink.table.utils.PartitionPathUtils.extractPartitionValues(org.apache.flink.core.fs.Path)>
 in (PartitionTimeCommitPredicate.java:70)
 Method 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger$1.currentProcTime()>
 calls method 
<org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()>
 in (ProcTimeCommitTrigger.java:111)
 Method 
<org.apache.flink.connector.file.table.stream.ProcTimeCommitTrigger.addPartition(java.lang.String)>
 calls method 
<org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()>
 in (ProcTimeCommitTrigger.java:76)
 Method 
<org.apache.flink.connector.file.table.stream.StreamingFileWriter.closePartFileForPartitions()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.closePartFileForBucket(java.lang.Object)>
 in (StreamingFileWriter.java:130)
@@ -607,33 +607,33 @@ Method 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.commit()>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.closeForCommit()>
 in (CompactBucketWriter.java:49)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.factory(org.apache.flink.util.function.SupplierWithException)>
 has generic parameter type 
<org.apache.flink.util.function.SupplierWithException<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<T,
 java.lang.String>, java.io.IOException>> with type argument depending on 
<org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter> in 
(CompactBucketWriter.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactBucketWriter.write(java.lang.Object)>
 calls method 
<org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.write(java.lang.Object,
 long)> in (CompactBucketWriter.java:44)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long,
 java.util.Map)> calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactCoordinator.java:190)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long,
 java.util.Map)> calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactCoordinator.java:198)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)>
 in (CompactCoordinator.java:116)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(CompactCoordinator.java:116)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long,
 java.util.Map)> calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactCoordinator.java:192)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.coordinate(long,
 java.util.Map)> calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactCoordinator.java:200)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)>
 in (CompactCoordinator.java:118)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(CompactCoordinator.java:118)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class,
 org.apache.flink.api.common.serialization.SerializerConfig)> in 
(CompactCoordinator.java:118)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 gets field 
<org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in 
(CompactCoordinator.java:107)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 gets field 
<org.apache.flink.api.common.typeutils.base.StringSerializer.INSTANCE> in 
(CompactCoordinator.java:107)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(CompactCoordinator.java:127)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(CompactCoordinator.java:129)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has generic parameter type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.compact.CompactMessages$CoordinatorInput>>
 with type argument depending on 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(CompactCoordinator.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactCoordinator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has parameter of type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(CompactCoordinator.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.commitUpToCheckpoint(long)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactFileWriter.java:63)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.commitUpToCheckpoint(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactFileWriter.java:62)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.commitUpToCheckpoint(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactFileWriter.java:63)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactFileWriter.onPartFileOpened(java.lang.String,
 org.apache.flink.core.fs.Path)> calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactFileWriter.java:52)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactOperator.java:162)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactOperator.java:160)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactOperator.java:161)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)>
 in (CompactOperator.java:109)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(CompactOperator.java:109)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls constructor 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.<init>(java.lang.Object)>
 in (CompactOperator.java:164)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactOperator.java:162)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.endCompaction(long)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactOperator.java:163)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)>
 in (CompactOperator.java:111)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer,
 org.apache.flink.api.common.typeutils.TypeSerializer)> in 
(CompactOperator.java:111)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 calls constructor 
<org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(java.lang.Class,
 org.apache.flink.api.common.serialization.SerializerConfig)> in 
(CompactOperator.java:111)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(org.apache.flink.runtime.state.StateInitializationContext)>
 gets field 
<org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in 
(CompactOperator.java:103)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()> in 
(CompactOperator.java:139)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo.getConfiguration()>
 in (CompactOperator.java:140)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactOperator.java:125)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactOperator.java:126)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(CompactOperator.java:121)
-Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.tasks.StreamTask.getEnvironment()> in 
(CompactOperator.java:138)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.runtime.execution.Environment.getTaskManagerInfo()> in 
(CompactOperator.java:141)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo.getConfiguration()>
 in (CompactOperator.java:142)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactOperator.java:127)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (CompactOperator.java:128)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in 
(CompactOperator.java:123)
+Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 calls method 
<org.apache.flink.streaming.runtime.tasks.StreamTask.getEnvironment()> in 
(CompactOperator.java:140)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has generic parameter type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord<org.apache.flink.connector.file.table.stream.compact.CompactMessages$CoordinatorOutput>>
 with type argument depending on 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(CompactOperator.java:0)
 Method 
<org.apache.flink.connector.file.table.stream.compact.CompactOperator.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)>
 has parameter of type 
<org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in 
(CompactOperator.java:0)
 Method 
<org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.invoke(org.apache.flink.table.data.RowData,
 org.apache.flink.streaming.api.functions.sink.SinkFunction$Context)> calls 
method 
<org.apache.flink.api.common.functions.util.PrintSinkOutputWriter.write(java.lang.Object)>
 in (PrintTableSinkFactory.java:189)
@@ -641,7 +641,7 @@ Method 
<org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrin
 Method 
<org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.open(org.apache.flink.api.common.functions.OpenContext)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (PrintTableSinkFactory.java:181)
 Method 
<org.apache.flink.connector.print.table.PrintTableSinkFactory$RowDataPrintFunction.open(org.apache.flink.api.common.functions.OpenContext)>
 calls method 
<org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()>
 in (PrintTableSinkFactory.java:182)
 Static Initializer 
<org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator.<clinit>()>
 gets field 
<org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.INSTANCE>
 in (CompactCoordinator.java:67)
-Static Initializer 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.<clinit>()>
 gets field 
<org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.INSTANCE>
 in (CompactorOperator.java:85)
+Static Initializer 
<org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator.<clinit>()>
 gets field 
<org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.INSTANCE>
 in (CompactorOperator.java:83)
 Static Initializer 
<org.apache.flink.connector.file.src.FileSourceSplitSerializer.<clinit>()> 
calls constructor 
<org.apache.flink.core.memory.DataOutputSerializer.<init>(int)> in 
(FileSourceSplitSerializer.java:42)
 Static Initializer 
<org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()>
 calls method 
<org.apache.flink.api.common.io.compression.Bzip2InputStreamFactory.getInstance()>
 in (StandardDeCompressors.java:45)
 Static Initializer 
<org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()>
 calls method 
<org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory.getInstance()>
 in (StandardDeCompressors.java:43)
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
index cf9f06252f5..0cb573d466f 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
@@ -46,8 +46,6 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -138,15 +136,15 @@ public class CompactorOperator
     @Override
     public void endInput() throws Exception {
         // add collecting requests into the final snapshot
-        checkpointRequests.put(Long.MAX_VALUE, collectingRequests);
+        checkpointRequests.put(CommittableMessage.EOI, collectingRequests);
         collectingRequests = new ArrayList<>();
 
         // submit all requests and wait until they are done
-        submitUntil(Long.MAX_VALUE);
+        submitUntil(CommittableMessage.EOI);
         assert checkpointRequests.isEmpty();
 
         getAllTasksFuture().join();
-        emitCompacted(null);
+        emitCompacted(CommittableMessage.EOI);
         assert compactingRequests.isEmpty();
     }
 
@@ -223,7 +221,7 @@ public class CompactorOperator
         canSubmit.clear();
     }
 
-    private void emitCompacted(@Nullable Long checkpointId) throws Exception {
+    private void emitCompacted(long checkpointId) throws Exception {
         List<FileSinkCommittable> compacted = new ArrayList<>();
         Iterator<Tuple2<CompactorRequest, 
CompletableFuture<Iterable<FileSinkCommittable>>>> iter =
                 compactingRequests.iterator();
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
index 7f8108db8ff..02bcbbdcac7 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperatorStateHandler.java
@@ -171,7 +171,7 @@ public class CompactorOperatorStateHandler
                         new CommittableSummary<>(
                                 summary.getSubtaskId(),
                                 summary.getNumberOfSubtasks(),
-                                getCheckpointId(summary),
+                                summary.getCheckpointIdOrEOI(),
                                 summary.getNumberOfCommittables() + 
results.size(),
                                 summary.getNumberOfPendingCommittables() + 
results.size(),
                                 summary.getNumberOfFailedCommittables())));
@@ -180,7 +180,7 @@ public class CompactorOperatorStateHandler
                     new StreamRecord<>(
                             new CommittableWithLineage<>(
                                     committable,
-                                    getCheckpointId(summary),
+                                    summary.getCheckpointIdOrEOI(),
                                     summary.getSubtaskId())));
         }
     }
@@ -204,7 +204,7 @@ public class CompactorOperatorStateHandler
         // cleanup request to the next summary, since the count of pending 
committable
         // for this checkpoint is immutable now
         Iterable<FileSinkCommittable> result = submit(request).get();
-        Long checkpointId = getCheckpointId(message);
+        Long checkpointId = message.getCheckpointIdOrEOI();
         boolean pendingFileSent = false;
         for (FileSinkCommittable c : result) {
             if (c.hasPendingFile()) {
@@ -275,10 +275,6 @@ public class CompactorOperatorStateHandler
         remainingRequestsState.update(Collections.singletonList(requestsMap));
     }
 
-    private Long getCheckpointId(CommittableMessage<FileSinkCommittable> 
message) {
-        return message.getCheckpointId().isPresent() ? 
message.getCheckpointId().getAsLong() : null;
-    }
-
     private CompletableFuture<Iterable<FileSinkCommittable>> 
submit(CompactorRequest request) {
         CompletableFuture<Iterable<FileSinkCommittable>> resultFuture = new 
CompletableFuture<>();
         compactService.submit(request, resultFuture);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
index 754b5e374f4..7db0c29ecc6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessage.java
@@ -22,15 +22,33 @@ import org.apache.flink.annotation.Experimental;
 
 import java.util.OptionalLong;
 
-/** The message send from {@link SinkWriter} to {@link Committer}. */
+/** The message send from {@code SinkWriter} to {@code Committer}. */
 @Experimental
 public interface CommittableMessage<CommT> {
+    /**
+     * Special value for checkpointId for the end of input in case of batch 
commit or final
+     * checkpoint.
+     */
+    long EOI = Long.MAX_VALUE;
+
     /** The subtask that created this committable. */
     int getSubtaskId();
 
     /**
      * Returns the checkpoint id or empty if the message does not belong to a 
checkpoint. In that
      * case, the committable was created at the end of input (e.g., in batch 
mode).
+     *
+     * @see #getCheckpointIdOrEOI()
+     */
+    @Deprecated
+    default OptionalLong getCheckpointId() {
+        long checkpointIdOrEOI = getCheckpointIdOrEOI();
+        return checkpointIdOrEOI == EOI ? OptionalLong.empty() : 
OptionalLong.of(checkpointIdOrEOI);
+    }
+
+    /**
+     * Returns the checkpoint id or EOI if this message belong to the final 
checkpoint or the batch
+     * commit.
      */
-    OptionalLong getCheckpointId();
+    long getCheckpointIdOrEOI();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializer.java
index 0fac7380dd9..cf251dd2d84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializer.java
@@ -42,7 +42,6 @@ public class CommittableMessageSerializer<CommT>
     @VisibleForTesting static final int VERSION = 1;
     private static final int COMMITTABLE = 1;
     private static final int SUMMARY = 2;
-    private static final long EOI = Long.MAX_VALUE;
 
     private final SimpleVersionedSerializer<CommT> committableSerializer;
 
@@ -64,14 +63,14 @@ public class CommittableMessageSerializer<CommT>
                     committableSerializer,
                     ((CommittableWithLineage<CommT>) obj).getCommittable(),
                     out);
-            writeCheckpointId(out, obj);
+            out.writeLong(obj.getCheckpointIdOrEOI());
             out.writeInt(obj.getSubtaskId());
         } else if (obj instanceof CommittableSummary) {
             out.writeByte(SUMMARY);
             out.writeInt(obj.getSubtaskId());
             CommittableSummary<?> committableSummary = (CommittableSummary<?>) 
obj;
             out.writeInt(committableSummary.getNumberOfSubtasks());
-            writeCheckpointId(out, obj);
+            out.writeLong(obj.getCheckpointIdOrEOI());
             out.writeInt(committableSummary.getNumberOfCommittables());
             out.writeInt(committableSummary.getNumberOfPendingCommittables());
             out.writeInt(committableSummary.getNumberOfFailedCommittables());
@@ -91,13 +90,13 @@ public class CommittableMessageSerializer<CommT>
                 return new CommittableWithLineage<>(
                         SimpleVersionedSerialization.readVersionAndDeSerialize(
                                 committableSerializer, in),
-                        readCheckpointId(in),
+                        in.readLong(),
                         in.readInt());
             case SUMMARY:
                 return new CommittableSummary<>(
                         in.readInt(),
                         in.readInt(),
-                        readCheckpointId(in),
+                        in.readLong(),
                         in.readInt(),
                         in.readInt(),
                         in.readInt());
@@ -109,14 +108,4 @@ public class CommittableMessageSerializer<CommT>
                                 + StringUtils.byteToHexString(serialized));
         }
     }
-
-    private void writeCheckpointId(DataOutputSerializer out, 
CommittableMessage<CommT> obj)
-            throws IOException {
-        out.writeLong(obj.getCheckpointId().orElse(EOI));
-    }
-
-    private Long readCheckpointId(DataInputDeserializer in) throws IOException 
{
-        long checkpointId = in.readLong();
-        return checkpointId == EOI ? null : checkpointId;
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfo.java
index ad8460ee9eb..c22868a7cf7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfo.java
@@ -50,7 +50,6 @@ public class CommittableMessageTypeInfo<CommT> extends 
TypeInformation<Committab
      * @param committableSerializerFactory factory to create the serializer 
for a {@link
      *     CommittableMessage}
      * @param <CommT> type of the committable
-     * @return
      */
     public static <CommT> TypeInformation<CommittableMessage<CommT>> of(
             SerializableSupplier<SimpleVersionedSerializer<CommT>> 
committableSerializerFactory) {
@@ -86,6 +85,7 @@ public class CommittableMessageTypeInfo<CommT> extends 
TypeInformation<Committab
         return 1;
     }
 
+    @SuppressWarnings({"unchecked", "rawtypes"})
     @Override
     public Class<CommittableMessage<CommT>> getTypeClass() {
         return (Class) CommittableMessage.class;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
index 7171a5168a7..7f1fb003806 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummary.java
@@ -20,9 +20,7 @@ package org.apache.flink.streaming.api.connector.sink2;
 
 import org.apache.flink.annotation.Experimental;
 
-import javax.annotation.Nullable;
-
-import java.util.OptionalLong;
+import java.util.Objects;
 
 /**
  * This class tracks the information about committables belonging to one 
checkpoint coming from one
@@ -38,7 +36,7 @@ public class CommittableSummary<CommT> implements 
CommittableMessage<CommT> {
     /** May change after recovery. */
     private final int numberOfSubtasks;
 
-    @Nullable private final Long checkpointId;
+    private final long checkpointId;
     /** The number of committables coming from the given subtask in the 
particular checkpoint. */
     private final int numberOfCommittables;
     /** The number of committables that have not been successfully committed. 
*/
@@ -49,7 +47,7 @@ public class CommittableSummary<CommT> implements 
CommittableMessage<CommT> {
     public CommittableSummary(
             int subtaskId,
             int numberOfSubtasks,
-            @Nullable Long checkpointId,
+            long checkpointId,
             int numberOfCommittables,
             int numberOfPendingCommittables,
             int numberOfFailedCommittables) {
@@ -69,8 +67,8 @@ public class CommittableSummary<CommT> implements 
CommittableMessage<CommT> {
         return numberOfSubtasks;
     }
 
-    public OptionalLong getCheckpointId() {
-        return checkpointId == null ? OptionalLong.empty() : 
OptionalLong.of(checkpointId);
+    public long getCheckpointIdOrEOI() {
+        return checkpointId;
     }
 
     public int getNumberOfCommittables() {
@@ -94,4 +92,50 @@ public class CommittableSummary<CommT> implements 
CommittableMessage<CommT> {
                 numberOfPendingCommittables,
                 numberOfFailedCommittables);
     }
+
+    @Override
+    public String toString() {
+        return "CommittableSummary{"
+                + "subtaskId="
+                + subtaskId
+                + ", numberOfSubtasks="
+                + numberOfSubtasks
+                + ", checkpointId="
+                + checkpointId
+                + ", numberOfCommittables="
+                + numberOfCommittables
+                + ", numberOfPendingCommittables="
+                + numberOfPendingCommittables
+                + ", numberOfFailedCommittables="
+                + numberOfFailedCommittables
+                + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CommittableSummary<?> that = (CommittableSummary<?>) o;
+        return subtaskId == that.subtaskId
+                && numberOfSubtasks == that.numberOfSubtasks
+                && checkpointId == that.checkpointId
+                && numberOfCommittables == that.numberOfCommittables
+                && numberOfPendingCommittables == 
that.numberOfPendingCommittables
+                && numberOfFailedCommittables == 
that.numberOfFailedCommittables;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                subtaskId,
+                numberOfSubtasks,
+                checkpointId,
+                numberOfCommittables,
+                numberOfPendingCommittables,
+                numberOfFailedCommittables);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
index 96683bfd2ca..bdfe9de262f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLineage.java
@@ -21,9 +21,7 @@ package org.apache.flink.streaming.api.connector.sink2;
 import org.apache.flink.annotation.Experimental;
 import org.apache.flink.api.connector.sink2.Committer;
 
-import javax.annotation.Nullable;
-
-import java.util.OptionalLong;
+import java.util.Objects;
 import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -35,10 +33,10 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Experimental
 public class CommittableWithLineage<CommT> implements 
CommittableMessage<CommT> {
     private final CommT committable;
-    @Nullable private final Long checkpointId;
+    private final long checkpointId;
     private final int subtaskId;
 
-    public CommittableWithLineage(CommT committable, @Nullable Long 
checkpointId, int subtaskId) {
+    public CommittableWithLineage(CommT committable, long checkpointId, int 
subtaskId) {
         this.committable = checkNotNull(committable);
         this.checkpointId = checkpointId;
         this.subtaskId = subtaskId;
@@ -52,11 +50,42 @@ public class CommittableWithLineage<CommT> implements 
CommittableMessage<CommT>
         return subtaskId;
     }
 
-    public OptionalLong getCheckpointId() {
-        return checkpointId == null ? OptionalLong.empty() : 
OptionalLong.of(checkpointId);
+    public long getCheckpointIdOrEOI() {
+        return checkpointId;
     }
 
     public <NewCommT> CommittableWithLineage<NewCommT> map(Function<CommT, 
NewCommT> mapper) {
         return new CommittableWithLineage<>(mapper.apply(committable), 
checkpointId, subtaskId);
     }
+
+    @Override
+    public String toString() {
+        return "CommittableWithLineage{"
+                + "committable="
+                + committable
+                + ", checkpointId="
+                + checkpointId
+                + ", subtaskId="
+                + subtaskId
+                + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CommittableWithLineage<?> that = (CommittableWithLineage<?>) o;
+        return checkpointId == that.checkpointId
+                && subtaskId == that.subtaskId
+                && Objects.equals(committable, that.committable);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(committable, checkpointId, subtaskId);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
index 028d8317d80..484d7a712c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java
@@ -50,6 +50,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.OptionalLong;
 
+import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -148,7 +149,7 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
         endInput = true;
         if (!isCheckpointingEnabled || isBatchMode) {
             // There will be no final checkpoint, all committables should be 
committed here
-            notifyCheckpointComplete(Long.MAX_VALUE);
+            notifyCheckpointComplete(EOI);
         }
     }
 
@@ -208,8 +209,8 @@ class CommitterOperator<CommT> extends 
AbstractStreamOperator<CommittableMessage
 
         // in case of unaligned checkpoint, we may receive 
notifyCheckpointComplete before the
         // committables
-        OptionalLong checkpointId = element.getValue().getCheckpointId();
-        if (checkpointId.isPresent() && checkpointId.getAsLong() <= 
lastCompletedCheckpointId) {
+        long checkpointId = element.getValue().getCheckpointIdOrEOI();
+        if (checkpointId <= lastCompletedCheckpointId) {
             commitAndEmitCheckpoints();
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
index ea9a60d6133..93f5ba66198 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java
@@ -197,10 +197,10 @@ class SinkWriterOperator<InputT, CommT> extends 
AbstractStreamOperator<Committab
     public void endInput() throws Exception {
         endOfInput = true;
         sinkWriter.flush(true);
-        emitCommittables(Long.MAX_VALUE);
+        emitCommittables(CommittableMessage.EOI);
     }
 
-    private void emitCommittables(Long checkpointId) throws IOException, 
InterruptedException {
+    private void emitCommittables(long checkpointId) throws IOException, 
InterruptedException {
         if (!emitDownstream) {
             // To support SinkV1 topologies with only a writer we have to call 
prepareCommit
             // although no committables are forwarded
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
index 38abd2d3a33..7740a87da42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.java
@@ -23,14 +23,11 @@ import 
org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -40,7 +37,7 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
     /** Mapping of subtask id to {@link SubtaskCommittableManager}. */
     private final Map<Integer, SubtaskCommittableManager<CommT>> 
subtasksCommittableManagers;
 
-    @Nullable private final Long checkpointId;
+    private final long checkpointId;
     private final int subtaskId;
     private final int numberOfSubtasks;
     private final SinkCommitterMetricGroup metricGroup;
@@ -48,7 +45,7 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
     CheckpointCommittableManagerImpl(
             int subtaskId,
             int numberOfSubtasks,
-            @Nullable Long checkpointId,
+            long checkpointId,
             SinkCommitterMetricGroup metricGroup) {
         this(new HashMap<>(), subtaskId, numberOfSubtasks, checkpointId, 
metricGroup);
     }
@@ -57,7 +54,7 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
             Map<Integer, SubtaskCommittableManager<CommT>> 
subtasksCommittableManagers,
             int subtaskId,
             int numberOfSubtasks,
-            @Nullable Long checkpointId,
+            long checkpointId,
             SinkCommitterMetricGroup metricGroup) {
         this.subtasksCommittableManagers = 
checkNotNull(subtasksCommittableManagers);
         this.subtaskId = subtaskId;
@@ -68,7 +65,6 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
 
     @Override
     public long getCheckpointId() {
-        checkNotNull(checkpointId);
         return checkpointId;
     }
 
@@ -77,16 +73,14 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
     }
 
     void upsertSummary(CommittableSummary<CommT> summary) {
+        SubtaskCommittableManager<CommT> manager =
+                new SubtaskCommittableManager<>(
+                        summary.getNumberOfCommittables(),
+                        subtaskId,
+                        summary.getCheckpointIdOrEOI(),
+                        metricGroup);
         SubtaskCommittableManager<CommT> existing =
-                subtasksCommittableManagers.putIfAbsent(
-                        summary.getSubtaskId(),
-                        new SubtaskCommittableManager<>(
-                                summary.getNumberOfCommittables(),
-                                subtaskId,
-                                summary.getCheckpointId().isPresent()
-                                        ? summary.getCheckpointId().getAsLong()
-                                        : null,
-                                metricGroup));
+                
subtasksCommittableManagers.putIfAbsent(summary.getSubtaskId(), manager);
         if (existing != null) {
             throw new UnsupportedOperationException(
                     "Currently it is not supported to update the 
CommittableSummary for a checkpoint coming from the same subtask. Please check 
the status of FLINK-25920");
@@ -152,7 +146,7 @@ class CheckpointCommittableManagerImpl<CommT> implements 
CheckpointCommittableMa
     }
 
     CheckpointCommittableManagerImpl<CommT> 
merge(CheckpointCommittableManagerImpl<CommT> other) {
-        checkArgument(Objects.equals(other.checkpointId, checkpointId));
+        checkArgument(other.checkpointId == checkpointId);
         for (Map.Entry<Integer, SubtaskCommittableManager<CommT>> subtaskEntry 
:
                 other.subtasksCommittableManagers.entrySet()) {
             subtasksCommittableManagers.merge(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
index 1b3d5bffc81..801c8446850 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
@@ -20,7 +20,7 @@ package 
org.apache.flink.streaming.runtime.operators.sink.committables;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.connector.sink2.InitContext;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -157,8 +157,7 @@ public class CommittableCollector<CommT> {
     }
 
     /**
-     * Returns {@link CheckpointCommittableManager} that is currently hold by 
the collector and
-     * associated with the {@link CommittableCollector#EOI} checkpoint id.
+     * Returns {@link CommittableManager} belonging to the last input.
      *
      * @return {@link CheckpointCommittableManager}
      */
@@ -235,12 +234,12 @@ public class CommittableCollector<CommT> {
     private void addSummary(CommittableSummary<CommT> summary) {
         checkpointCommittables
                 .computeIfAbsent(
-                        summary.getCheckpointId().orElse(EOI),
+                        summary.getCheckpointIdOrEOI(),
                         key ->
                                 new CheckpointCommittableManagerImpl<>(
                                         subtaskId,
                                         numberOfSubtasks,
-                                        summary.getCheckpointId().orElse(EOI),
+                                        summary.getCheckpointIdOrEOI(),
                                         metricGroup))
                 .upsertSummary(summary);
     }
@@ -252,7 +251,7 @@ public class CommittableCollector<CommT> {
     private CheckpointCommittableManagerImpl<CommT> getCheckpointCommittables(
             CommittableMessage<CommT> committable) {
         CheckpointCommittableManagerImpl<CommT> committables =
-                
this.checkpointCommittables.get(committable.getCheckpointId().orElse(EOI));
+                
this.checkpointCommittables.get(committable.getCheckpointIdOrEOI());
         return checkNotNull(committables, "Unknown checkpoint for %s", 
committable);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
index 1baadeb40a8..381cec977f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittableManager.java
@@ -22,8 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
 import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -42,7 +40,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 class SubtaskCommittableManager<CommT> {
     private final Deque<CommitRequestImpl<CommT>> requests;
     private int numExpectedCommittables;
-    @Nullable private final Long checkpointId;
+    private final long checkpointId;
     private final int subtaskId;
     private int numDrained;
     private int numFailed;
@@ -51,7 +49,7 @@ class SubtaskCommittableManager<CommT> {
     SubtaskCommittableManager(
             int numExpectedCommittables,
             int subtaskId,
-            @Nullable Long checkpointId,
+            long checkpointId,
             SinkCommitterMetricGroup metricGroup) {
         this(
                 Collections.emptyList(),
@@ -69,7 +67,7 @@ class SubtaskCommittableManager<CommT> {
             int numDrained,
             int numFailed,
             int subtaskId,
-            @Nullable Long checkpointId,
+            long checkpointId,
             SinkCommitterMetricGroup metricGroup) {
         this.checkpointId = checkpointId;
         this.subtaskId = subtaskId;
@@ -179,8 +177,7 @@ class SubtaskCommittableManager<CommT> {
     }
 
     @VisibleForTesting
-    @Nullable
-    Long getCheckpointId() {
+    long getCheckpointId() {
         return checkpointId;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java
index b3850b41935..4c7b0d7096c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java
@@ -40,7 +40,7 @@ class CommittableMessageSerializerTest {
         assertThat(message).isInstanceOf(CommittableWithLineage.class);
         final CommittableWithLineage<Integer> copy = 
(CommittableWithLineage<Integer>) message;
         assertThat(copy.getCommittable()).isEqualTo(1);
-        assertThat(copy.getCheckpointId()).isPresent().hasValue(2L);
+        assertThat(copy.getCheckpointIdOrEOI()).isEqualTo(2L);
         assertThat(copy.getSubtaskId()).isEqualTo(3);
     }
 
@@ -56,7 +56,7 @@ class CommittableMessageSerializerTest {
         final CommittableSummary<Integer> copy = (CommittableSummary<Integer>) 
message;
         assertThat(copy.getSubtaskId()).isEqualTo(1);
         assertThat(copy.getNumberOfSubtasks()).isEqualTo(2);
-        assertThat(copy.getCheckpointId()).isPresent().hasValue(3L);
+        assertThat(copy.getCheckpointIdOrEOI()).isEqualTo(3L);
         assertThat(copy.getNumberOfCommittables()).isEqualTo(4);
         assertThat(copy.getNumberOfPendingCommittables()).isEqualTo(5);
         assertThat(copy.getNumberOfFailedCommittables()).isEqualTo(6);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
index fb1062a5091..e02e838aeb3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableSummaryAssert.java
@@ -20,8 +20,6 @@ package org.apache.flink.streaming.api.connector.sink2;
 
 import org.assertj.core.api.AbstractAssert;
 
-import javax.annotation.Nullable;
-
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Custom assertions for {@link CommittableSummary}. */
@@ -75,13 +73,9 @@ public class CommittableSummaryAssert
         return this;
     }
 
-    public CommittableSummaryAssert hasCheckpointId(@Nullable Long 
checkpointId) {
+    public CommittableSummaryAssert hasCheckpointId(long checkpointId) {
         isNotNull();
-        if (checkpointId == null) {
-            assertThat(actual.getCheckpointId()).isEmpty();
-        } else {
-            assertThat(actual.getCheckpointId()).hasValue(checkpointId);
-        }
+        assertThat(actual.getCheckpointIdOrEOI()).isEqualTo(checkpointId);
         return this;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java
index 966b4330c19..0937c7454d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableWithLinageAssert.java
@@ -20,8 +20,6 @@ package org.apache.flink.streaming.api.connector.sink2;
 
 import org.assertj.core.api.AbstractAssert;
 
-import javax.annotation.Nullable;
-
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -38,7 +36,8 @@ public class CommittableWithLinageAssert
     public CommittableWithLinageAssert isEqualTo(CommittableWithLineage<?> 
committableWithLineage) {
         isNotNull();
         
assertThat(actual.getSubtaskId()).isEqualTo(committableWithLineage.getSubtaskId());
-        
assertThat(actual.getCheckpointId()).isEqualTo(committableWithLineage.getCheckpointId());
+        assertThat(actual.getCheckpointIdOrEOI())
+                .isEqualTo(committableWithLineage.getCheckpointIdOrEOI());
         
assertThat(actual.getCommittable()).isEqualTo(committableWithLineage.getCommittable());
         return this;
     }
@@ -49,13 +48,9 @@ public class CommittableWithLinageAssert
         return this;
     }
 
-    public CommittableWithLinageAssert hasCheckpointId(@Nullable Long 
checkpointId) {
+    public CommittableWithLinageAssert hasCheckpointId(long checkpointId) {
         isNotNull();
-        if (checkpointId == null) {
-            assertThat(actual.getCheckpointId()).isEmpty();
-        } else {
-            assertThat(actual.getCheckpointId()).hasValue(checkpointId);
-        }
+        assertThat(actual.getCheckpointIdOrEOI()).isEqualTo(checkpointId);
         return this;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
index 441a2b437a0..dc45e939b1c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class GlobalCommitterOperatorTest {
@@ -97,15 +98,15 @@ class GlobalCommitterOperatorTest {
         testHarness.open();
 
         final CommittableSummary<Integer> committableSummary =
-                new CommittableSummary<>(1, 2, null, 1, 1, 0);
+                new CommittableSummary<>(1, 2, EOI, 1, 1, 0);
         testHarness.processElement(new StreamRecord<>(committableSummary));
         final CommittableSummary<Integer> committableSummary2 =
-                new CommittableSummary<>(2, 2, null, 1, 1, 0);
+                new CommittableSummary<>(2, 2, EOI, 1, 1, 0);
         testHarness.processElement(new StreamRecord<>(committableSummary2));
 
-        final CommittableWithLineage<Integer> first = new 
CommittableWithLineage<>(1, null, 1);
+        final CommittableWithLineage<Integer> first = new 
CommittableWithLineage<>(1, EOI, 1);
         testHarness.processElement(new StreamRecord<>(first));
-        final CommittableWithLineage<Integer> second = new 
CommittableWithLineage<>(2, null, 2);
+        final CommittableWithLineage<Integer> second = new 
CommittableWithLineage<>(2, EOI, 2);
         testHarness.processElement(new StreamRecord<>(second));
 
         testHarness.endInput();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
index 5af5efed1a9..3571e60f2c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
 
 import org.junit.jupiter.api.Test;
 
+import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static org.assertj.core.api.Assertions.assertThat;
 
 class CommittableCollectorTest {
@@ -50,13 +51,12 @@ class CommittableCollectorTest {
     void testGetEndOfInputCommittable() {
         final CommittableCollector<Integer> committableCollector =
                 new CommittableCollector<>(1, 1, METRIC_GROUP);
-        CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, 
null, 1, 0, 0);
+        CommittableSummary<Integer> first = new CommittableSummary<>(1, 1, 
EOI, 1, 0, 0);
         committableCollector.addMessage(first);
 
         CommittableManager<Integer> endOfInputCommittable =
                 committableCollector.getEndOfInputCommittable();
         assertThat(endOfInputCommittable).isNotNull();
-        SinkV2Assertions.assertThat(endOfInputCommittable.getSummary())
-                .hasCheckpointId(Long.MAX_VALUE);
+        
SinkV2Assertions.assertThat(endOfInputCommittable.getSummary()).hasCheckpointId(EOI);
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
index d1269ed89c1..6e7674b47a9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.util.List;
 import java.util.function.IntSupplier;
 
+import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static 
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput;
 import static 
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableSummary;
 import static 
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.toCommittableWithLinage;
@@ -171,15 +172,15 @@ abstract class CommitterOperatorTestBase {
         testHarness.open();
 
         final CommittableSummary<String> committableSummary =
-                new CommittableSummary<>(1, 2, null, 1, 1, 0);
+                new CommittableSummary<>(1, 2, EOI, 1, 1, 0);
         testHarness.processElement(new StreamRecord<>(committableSummary));
         final CommittableSummary<String> committableSummary2 =
-                new CommittableSummary<>(2, 2, null, 1, 1, 0);
+                new CommittableSummary<>(2, 2, EOI, 1, 1, 0);
         testHarness.processElement(new StreamRecord<>(committableSummary2));
 
-        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", null, 1);
+        final CommittableWithLineage<String> first = new 
CommittableWithLineage<>("1", EOI, 1);
         testHarness.processElement(new StreamRecord<>(first));
-        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("1", null, 2);
+        final CommittableWithLineage<String> second = new 
CommittableWithLineage<>("1", EOI, 2);
         testHarness.processElement(new StreamRecord<>(second));
 
         testHarness.endInput();
@@ -330,11 +331,7 @@ abstract class CommitterOperatorTestBase {
     CommittableWithLineage<?> copyCommittableWithDifferentOrigin(
             CommittableWithLineage<?> committable, int subtaskId) {
         return new CommittableWithLineage<>(
-                committable.getCommittable(),
-                committable.getCheckpointId().isPresent()
-                        ? committable.getCheckpointId().getAsLong()
-                        : null,
-                subtaskId);
+                committable.getCommittable(), 
committable.getCheckpointIdOrEOI(), subtaskId);
     }
 
     private OneInputStreamOperatorTestHarness<
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
index ecd7a3e54f3..4ede8a6ee83 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.java
@@ -54,8 +54,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.lang.reflect.Proxy;
 import java.util.ArrayList;
@@ -71,6 +69,7 @@ import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
+import static 
org.apache.flink.streaming.api.connector.sink2.CommittableMessage.EOI;
 import static 
org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil.fromOutput;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -178,7 +177,7 @@ abstract class SinkWriterOperatorTestBase {
 
         testHarness.processElement(1, 1);
         testHarness.endInput();
-        assertBasicOutput(testHarness.getOutput(), 1, Long.MAX_VALUE);
+        assertBasicOutput(testHarness.getOutput(), 1, EOI);
     }
 
     @ParameterizedTest
@@ -222,7 +221,7 @@ abstract class SinkWriterOperatorTestBase {
         restoredTestHarness.notifyOfCompletedCheckpoint(checkpointId);
 
         if (stateful) {
-            assertBasicOutput(restoredTestHarness.getOutput(), 2, 
Long.MAX_VALUE);
+            assertBasicOutput(restoredTestHarness.getOutput(), 2, EOI);
         } else {
             
assertThat(fromOutput(restoredTestHarness.getOutput()).get(0).asRecord().getValue())
                     .isInstanceOf(CommittableSummary.class)
@@ -565,9 +564,7 @@ abstract class SinkWriterOperatorTestBase {
     }
 
     private static void assertBasicOutput(
-            Collection<Object> queuedOutput,
-            int numberOfCommittables,
-            @Nullable Long checkpointId) {
+            Collection<Object> queuedOutput, int numberOfCommittables, long 
checkpointId) {
         List<StreamElement> output = fromOutput(queuedOutput);
         assertThat(output).hasSize(numberOfCommittables + 1);
         assertThat(output.get(0).asRecord().getValue())

Reply via email to