This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.2 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 9b4791014a3fe4dcaeb714e5e62c945f8173ef1d Author: Jingsong Lee <[email protected]> AuthorDate: Mon Jun 16 10:20:11 2025 +0800 [flink] Make PostponeBucketSink no state and no intended failure (#5746) --- .../paimon/flink/sink/cdc/CdcAppendTableSink.java | 7 +++ .../sink/cdc/CdcAppendTableWriteOperator.java | 13 +++++ .../flink/sink/cdc/FlinkCdcMultiTableSink.java | 2 +- .../apache/paimon/flink/sink/AppendTableSink.java | 2 + .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 2 +- .../apache/paimon/flink/sink/FlinkWriteSink.java | 50 +++++++++++++++++++ .../paimon/flink/sink/PostponeBucketSink.java} | 34 ++++++------- .../RestoreAndFailCommittableStateManager.java | 56 ++-------------------- ...er.java => RestoreCommittableStateManager.java} | 28 ++--------- .../paimon/flink/sink/RowAppendTableSink.java | 40 ++++------------ .../sink/BatchWriteGeneratorTagOperatorTest.java | 4 +- .../paimon/flink/sink/CommitterOperatorTest.java | 2 +- .../paimon/flink/sink/StoreMultiCommitterTest.java | 2 +- 13 files changed, 112 insertions(+), 130 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java index d43690e5b2..7a5b2a0f11 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java @@ -19,8 +19,10 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.sink.CommittableStateManager; import org.apache.paimon.flink.sink.FlinkWriteSink; import org.apache.paimon.flink.sink.StoreSinkWrite; +import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.streaming.api.datastream.DataStream; @@ -52,4 +54,9 @@ public class CdcAppendTableSink extends FlinkWriteSink<CdcRecord> { DataStream<CdcRecord> input, String initialCommitUser, @Nullable Integer parallelism) { return super.doWrite(input, initialCommitUser, this.parallelism); } + + @Override + protected CommittableStateManager<ManifestCommittable> createCommittableStateManager() { + return createRestoreOnlyCommittableStateManager(table); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java index 96b9fefcdf..b3e4153727 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java @@ -19,11 +19,14 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.sink.NoopStoreSinkWriteState; import org.apache.paimon.flink.sink.PrepareCommitOperator; import org.apache.paimon.flink.sink.StoreSinkWrite; +import org.apache.paimon.flink.sink.StoreSinkWriteState; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.RowKind; +import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; @@ -40,6 +43,16 @@ public class CdcAppendTableWriteOperator extends CdcRecordStoreWriteOperator { super(parameters, table, storeSinkWriteProvider, initialCommitUser); } + @Override + protected StoreSinkWriteState createState( + int subtaskId, + StateInitializationContext context, + StoreSinkWriteState.StateValueFilter stateFilter) { + // No conflicts will occur in append only unaware bucket writer, so no state + // is needed. + return new NoopStoreSinkWriteState(subtaskId, stateFilter); + } + @Override public void processElement(StreamRecord<CdcRecord> element) throws Exception { // only accepts INSERT record diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index 0cd2638179..a67e74ef55 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -177,6 +177,6 @@ public class FlinkCdcMultiTableSink implements Serializable { protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() { return new RestoreAndFailCommittableStateManager<>( - WrappedManifestCommittableSerializer::new); + WrappedManifestCommittableSerializer::new, true); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java index 24d0e2db6f..094d61f6ea 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java @@ -48,6 +48,8 @@ import static org.apache.paimon.flink.utils.ParallelismUtils.setParallelism; */ public abstract class AppendTableSink<T> extends FlinkWriteSink<T> { + private static final long serialVersionUID = 1L; + protected final FileStoreTable table; protected final LogSinkFunction logSinkFunction; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 7f211870b9..3484210014 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -300,7 +300,7 @@ public class FlinkSinkBuilder { channelComputer = new PostponeBucketChannelComputer(table.schema()); } DataStream<InternalRow> partitioned = partition(input, channelComputer, parallelism); - FixedBucketSink sink = new FixedBucketSink(table, overwritePartition, null); + PostponeBucketSink sink = new PostponeBucketSink(table, overwritePartition); return sink.sinkFrom(partitioned); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java index 74f1febd90..8dc2044734 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java @@ -18,11 +18,17 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.data.InternalRow; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestCommittableSerializer; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; + import javax.annotation.Nullable; import java.util.Map; @@ -63,4 +69,48 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> { ManifestCommittableSerializer::new, options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE)); } + + protected static OneInputStreamOperatorFactory<InternalRow, Committable> + createNoStateRowWriteOperatorFactory( + FileStoreTable table, + LogSinkFunction logSinkFunction, + StoreSinkWrite.Provider writeProvider, + String commitUser) { + return new RowDataStoreWriteOperator.Factory( + table, logSinkFunction, writeProvider, commitUser) { + @Override + @SuppressWarnings("unchecked, rawtypes") + public StreamOperator createStreamOperator(StreamOperatorParameters parameters) { + return new RowDataStoreWriteOperator( + parameters, table, logSinkFunction, writeProvider, commitUser) { + + @Override + protected StoreSinkWriteState createState( + int subtaskId, + StateInitializationContext context, + StoreSinkWriteState.StateValueFilter stateFilter) { + // No conflicts will occur in append only unaware bucket writer, so no state + // is needed. + return new NoopStoreSinkWriteState(subtaskId, stateFilter); + } + + @Override + protected String getCommitUser(StateInitializationContext context) + throws Exception { + // No conflicts will occur in append only unaware bucket writer, so + // commitUser does not matter. + return commitUser; + } + }; + } + }; + } + + protected static CommittableStateManager<ManifestCommittable> + createRestoreOnlyCommittableStateManager(FileStoreTable table) { + Options options = table.coreOptions().toConfiguration(); + return new RestoreCommittableStateManager<>( + ManifestCommittableSerializer::new, + options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE)); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketSink.java similarity index 51% copy from paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java copy to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketSink.java index d43690e5b2..1ab6f398c5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketSink.java @@ -16,40 +16,36 @@ * limitations under the License. */ -package org.apache.paimon.flink.sink.cdc; +package org.apache.paimon.flink.sink; -import org.apache.paimon.flink.sink.Committable; -import org.apache.paimon.flink.sink.FlinkWriteSink; -import org.apache.paimon.flink.sink.StoreSinkWrite; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import javax.annotation.Nullable; -/** - * CDC Sink for unaware bucket table. It should not add compaction node, because the compaction may - * have old schema. - */ -public class CdcAppendTableSink extends FlinkWriteSink<CdcRecord> { +import java.util.Map; + +/** {@link FlinkSink} for writing records into fixed bucket Paimon table. */ +public class PostponeBucketSink extends FlinkWriteSink<InternalRow> { - private final Integer parallelism; + private static final long serialVersionUID = 1L; - public CdcAppendTableSink(FileStoreTable table, Integer parallelism) { - super(table, null); - this.parallelism = parallelism; + public PostponeBucketSink( + FileStoreTable table, @Nullable Map<String, String> overwritePartition) { + super(table, overwritePartition); } @Override - protected OneInputStreamOperatorFactory<CdcRecord, Committable> createWriteOperatorFactory( + protected OneInputStreamOperatorFactory<InternalRow, Committable> createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new CdcAppendTableWriteOperator.Factory(table, writeProvider, commitUser); + return createNoStateRowWriteOperatorFactory(table, null, writeProvider, commitUser); } @Override - public DataStream<Committable> doWrite( - DataStream<CdcRecord> input, String initialCommitUser, @Nullable Integer parallelism) { - return super.doWrite(input, initialCommitUser, this.parallelism); + protected CommittableStateManager<ManifestCommittable> createCommittableStateManager() { + return createRestoreOnlyCommittableStateManager(table); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java index a9b0922d0b..8556cb6e77 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java @@ -19,18 +19,9 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.data.serializer.VersionedSerializer; -import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.utils.SerializableSupplier; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; - -import java.util.ArrayList; import java.util.List; /** @@ -44,52 +35,20 @@ import java.util.List; * store writers. */ public class RestoreAndFailCommittableStateManager<GlobalCommitT> - implements CommittableStateManager<GlobalCommitT> { + extends RestoreCommittableStateManager<GlobalCommitT> { private static final long serialVersionUID = 1L; - /** The committable's serializer. */ - private final SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer; - - private final boolean partitionMarkDoneRecoverFromState; - - /** GlobalCommitT state of this job. Used to filter out previous successful commits. */ - private ListState<GlobalCommitT> streamingCommitterState; - - public RestoreAndFailCommittableStateManager( - SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer) { - this(committableSerializer, true); - } - public RestoreAndFailCommittableStateManager( SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer, boolean partitionMarkDoneRecoverFromState) { - this.committableSerializer = committableSerializer; - this.partitionMarkDoneRecoverFromState = partitionMarkDoneRecoverFromState; + super(committableSerializer, partitionMarkDoneRecoverFromState); } @Override - public void initializeState( - StateInitializationContext context, Committer<?, GlobalCommitT> committer) + protected int recover(List<GlobalCommitT> committables, Committer<?, GlobalCommitT> committer) throws Exception { - streamingCommitterState = - new SimpleVersionedListState<>( - context.getOperatorStateStore() - .getListState( - new ListStateDescriptor<>( - "streaming_committer_raw_states", - BytePrimitiveArraySerializer.INSTANCE)), - new VersionedSerializerWrapper<>(committableSerializer.get())); - List<GlobalCommitT> restored = new ArrayList<>(); - streamingCommitterState.get().forEach(restored::add); - streamingCommitterState.clear(); - recover(restored, committer); - } - - private void recover(List<GlobalCommitT> committables, Committer<?, GlobalCommitT> committer) - throws Exception { - int numCommitted = - committer.filterAndCommit(committables, true, partitionMarkDoneRecoverFromState); + int numCommitted = super.recover(committables, committer); if (numCommitted > 0) { throw new RuntimeException( "This exception is intentionally thrown " @@ -97,11 +56,6 @@ public class RestoreAndFailCommittableStateManager<GlobalCommitT> + "By restarting the job we hope that " + "writers can start writing based on these new commits."); } - } - - @Override - public void snapshotState(StateSnapshotContext context, List<GlobalCommitT> committables) - throws Exception { - streamingCommitterState.update(committables); + return numCommitted; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java similarity index 75% copy from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java copy to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java index a9b0922d0b..b1ed396bdc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java @@ -36,14 +36,9 @@ import java.util.List; /** * A {@link CommittableStateManager} which stores uncommitted {@link ManifestCommittable}s in state. * - * <p>When the job restarts, these {@link ManifestCommittable}s will be restored and committed, then - * an intended failure will occur, hoping that after the job restarts, all writers can start writing - * based on the restored snapshot. - * - * <p>Useful for committing snapshots containing records. For example snapshots produced by table - * store writers. + * <p>When the job restarts, these {@link ManifestCommittable}s will be restored and committed. */ -public class RestoreAndFailCommittableStateManager<GlobalCommitT> +public class RestoreCommittableStateManager<GlobalCommitT> implements CommittableStateManager<GlobalCommitT> { private static final long serialVersionUID = 1L; @@ -56,12 +51,7 @@ public class RestoreAndFailCommittableStateManager<GlobalCommitT> /** GlobalCommitT state of this job. Used to filter out previous successful commits. */ private ListState<GlobalCommitT> streamingCommitterState; - public RestoreAndFailCommittableStateManager( - SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer) { - this(committableSerializer, true); - } - - public RestoreAndFailCommittableStateManager( + public RestoreCommittableStateManager( SerializableSupplier<VersionedSerializer<GlobalCommitT>> committableSerializer, boolean partitionMarkDoneRecoverFromState) { this.committableSerializer = committableSerializer; @@ -86,17 +76,9 @@ public class RestoreAndFailCommittableStateManager<GlobalCommitT> recover(restored, committer); } - private void recover(List<GlobalCommitT> committables, Committer<?, GlobalCommitT> committer) + protected int recover(List<GlobalCommitT> committables, Committer<?, GlobalCommitT> committer) throws Exception { - int numCommitted = - committer.filterAndCommit(committables, true, partitionMarkDoneRecoverFromState); - if (numCommitted > 0) { - throw new RuntimeException( - "This exception is intentionally thrown " - + "after committing the restored checkpoints. " - + "By restarting the job we hope that " - + "writers can start writing based on these new commits."); - } + return committer.filterAndCommit(committables, true, partitionMarkDoneRecoverFromState); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java index 69a339a411..a58839a841 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java @@ -19,18 +19,18 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.FileStoreTable; -import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; -import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import java.util.Map; /** An {@link AppendTableSink} which handles {@link InternalRow}. */ public class RowAppendTableSink extends AppendTableSink<InternalRow> { + private static final long serialVersionUID = 1L; + public RowAppendTableSink( FileStoreTable table, Map<String, String> overwritePartitions, @@ -42,34 +42,12 @@ public class RowAppendTableSink extends AppendTableSink<InternalRow> { @Override protected OneInputStreamOperatorFactory<InternalRow, Committable> createWriteOperatorFactory( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new RowDataStoreWriteOperator.Factory( - table, logSinkFunction, writeProvider, commitUser) { - @Override - @SuppressWarnings("unchecked, rawtypes") - public StreamOperator createStreamOperator(StreamOperatorParameters parameters) { - return new RowDataStoreWriteOperator( - parameters, table, logSinkFunction, writeProvider, commitUser) { - - @Override - protected StoreSinkWriteState createState( - int subtaskId, - StateInitializationContext context, - StoreSinkWriteState.StateValueFilter stateFilter) - throws Exception { - // No conflicts will occur in append only unaware bucket writer, so no state - // is needed. - return new NoopStoreSinkWriteState(subtaskId, stateFilter); - } + return createNoStateRowWriteOperatorFactory( + table, logSinkFunction, writeProvider, commitUser); + } - @Override - protected String getCommitUser(StateInitializationContext context) - throws Exception { - // No conflicts will occur in append only unaware bucket writer, so - // commitUser does not matter. - return commitUser; - } - }; - } - }; + @Override + protected CommittableStateManager<ManifestCommittable> createCommittableStateManager() { + return createRestoreOnlyCommittableStateManager(table); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java index 32ee2f42af..1bc68477a3 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java @@ -67,7 +67,7 @@ public class BatchWriteGeneratorTagOperatorTest extends CommitterOperatorTest { table, initialCommitUser, new RestoreAndFailCommittableStateManager<>( - ManifestCommittableSerializer::new)); + ManifestCommittableSerializer::new, true)); OneInputStreamOperator<Committable, Committable> committerOperator = committerOperatorFactory.createStreamOperator( @@ -143,7 +143,7 @@ public class BatchWriteGeneratorTagOperatorTest extends CommitterOperatorTest { table, initialCommitUser, new RestoreAndFailCommittableStateManager<>( - ManifestCommittableSerializer::new)); + ManifestCommittableSerializer::new, true)); OneInputStreamOperator<Committable, Committable> committerOperator = committerOperatorFactory.createStreamOperator( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java index 4ad1dff9aa..220af9a73b 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java @@ -665,7 +665,7 @@ public class CommitterOperatorTest extends CommitterOperatorTestBase { table, null, new RestoreAndFailCommittableStateManager<>( - ManifestCommittableSerializer::new)); + ManifestCommittableSerializer::new, true)); OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = createTestHarness(operatorFactory); testHarness.open(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java index a61a379bde..041a692d9e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java @@ -652,7 +652,7 @@ class StoreMultiCommitterTest { initialCommitUser, context -> new StoreMultiCommitter(catalogLoader, context), new RestoreAndFailCommittableStateManager<>( - WrappedManifestCommittableSerializer::new)); + WrappedManifestCommittableSerializer::new, true)); return createTestHarness(operator); }
