This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 21ba91e9f8 [flink] Make PostponeBucketSink no state and no intended
failure (#5746)
21ba91e9f8 is described below
commit 21ba91e9f8af4c5937ed9f31928147ca0688d5eb
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jun 16 10:20:11 2025 +0800
[flink] Make PostponeBucketSink no state and no intended failure (#5746)
---
.../paimon/flink/sink/cdc/CdcAppendTableSink.java | 7 +++
.../sink/cdc/CdcAppendTableWriteOperator.java | 13 +++++
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 2 +-
.../apache/paimon/flink/sink/AppendTableSink.java | 2 +
.../apache/paimon/flink/sink/FlinkSinkBuilder.java | 2 +-
.../apache/paimon/flink/sink/FlinkWriteSink.java | 50 +++++++++++++++++++
.../paimon/flink/sink/PostponeBucketSink.java} | 34 ++++++-------
.../RestoreAndFailCommittableStateManager.java | 56 ++--------------------
...er.java => RestoreCommittableStateManager.java} | 28 ++---------
.../paimon/flink/sink/RowAppendTableSink.java | 40 ++++------------
.../sink/BatchWriteGeneratorTagOperatorTest.java | 4 +-
.../paimon/flink/sink/CommitterOperatorTest.java | 2 +-
.../paimon/flink/sink/StoreMultiCommitterTest.java | 2 +-
13 files changed, 112 insertions(+), 130 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
index d43690e5b2..7a5b2a0f11 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
@@ -19,8 +19,10 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.FlinkWriteSink;
import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -52,4 +54,9 @@ public class CdcAppendTableSink extends
FlinkWriteSink<CdcRecord> {
DataStream<CdcRecord> input, String initialCommitUser, @Nullable
Integer parallelism) {
return super.doWrite(input, initialCommitUser, this.parallelism);
}
+
+ @Override
+ protected CommittableStateManager<ManifestCommittable>
createCommittableStateManager() {
+ return createRestoreOnlyCommittableStateManager(table);
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
index 96b9fefcdf..b3e4153727 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableWriteOperator.java
@@ -19,11 +19,14 @@
package org.apache.paimon.flink.sink.cdc;
import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.NoopStoreSinkWriteState;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowKind;
+import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -40,6 +43,16 @@ public class CdcAppendTableWriteOperator extends
CdcRecordStoreWriteOperator {
super(parameters, table, storeSinkWriteProvider, initialCommitUser);
}
+ @Override
+ protected StoreSinkWriteState createState(
+ int subtaskId,
+ StateInitializationContext context,
+ StoreSinkWriteState.StateValueFilter stateFilter) {
+ // No conflicts will occur in append only unaware bucket writer, so no
state
+ // is needed.
+ return new NoopStoreSinkWriteState(subtaskId, stateFilter);
+ }
+
@Override
public void processElement(StreamRecord<CdcRecord> element) throws
Exception {
// only accepts INSERT record
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 0cd2638179..a67e74ef55 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -177,6 +177,6 @@ public class FlinkCdcMultiTableSink implements Serializable
{
protected CommittableStateManager<WrappedManifestCommittable>
createCommittableStateManager() {
return new RestoreAndFailCommittableStateManager<>(
- WrappedManifestCommittableSerializer::new);
+ WrappedManifestCommittableSerializer::new, true);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
index 24d0e2db6f..094d61f6ea 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
@@ -48,6 +48,8 @@ import static
org.apache.paimon.flink.utils.ParallelismUtils.setParallelism;
*/
public abstract class AppendTableSink<T> extends FlinkWriteSink<T> {
+ private static final long serialVersionUID = 1L;
+
protected final FileStoreTable table;
protected final LogSinkFunction logSinkFunction;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 7f211870b9..3484210014 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -300,7 +300,7 @@ public class FlinkSinkBuilder {
channelComputer = new
PostponeBucketChannelComputer(table.schema());
}
DataStream<InternalRow> partitioned = partition(input,
channelComputer, parallelism);
- FixedBucketSink sink = new FixedBucketSink(table, overwritePartition,
null);
+ PostponeBucketSink sink = new PostponeBucketSink(table,
overwritePartition);
return sink.sinkFrom(partitioned);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index 74f1febd90..8dc2044734 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -18,11 +18,17 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+
import javax.annotation.Nullable;
import java.util.Map;
@@ -63,4 +69,48 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T>
{
ManifestCommittableSerializer::new,
options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE));
}
+
+ protected static OneInputStreamOperatorFactory<InternalRow, Committable>
+ createNoStateRowWriteOperatorFactory(
+ FileStoreTable table,
+ LogSinkFunction logSinkFunction,
+ StoreSinkWrite.Provider writeProvider,
+ String commitUser) {
+ return new RowDataStoreWriteOperator.Factory(
+ table, logSinkFunction, writeProvider, commitUser) {
+ @Override
+ @SuppressWarnings("unchecked, rawtypes")
+ public StreamOperator
createStreamOperator(StreamOperatorParameters parameters) {
+ return new RowDataStoreWriteOperator(
+ parameters, table, logSinkFunction, writeProvider,
commitUser) {
+
+ @Override
+ protected StoreSinkWriteState createState(
+ int subtaskId,
+ StateInitializationContext context,
+ StoreSinkWriteState.StateValueFilter stateFilter) {
+ // No conflicts will occur in append only unaware
bucket writer, so no state
+ // is needed.
+ return new NoopStoreSinkWriteState(subtaskId,
stateFilter);
+ }
+
+ @Override
+ protected String getCommitUser(StateInitializationContext
context)
+ throws Exception {
+ // No conflicts will occur in append only unaware
bucket writer, so
+ // commitUser does not matter.
+ return commitUser;
+ }
+ };
+ }
+ };
+ }
+
+ protected static CommittableStateManager<ManifestCommittable>
+ createRestoreOnlyCommittableStateManager(FileStoreTable table) {
+ Options options = table.coreOptions().toConfiguration();
+ return new RestoreCommittableStateManager<>(
+ ManifestCommittableSerializer::new,
+ options.get(PARTITION_MARK_DONE_RECOVER_FROM_STATE));
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketSink.java
similarity index 51%
copy from
paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketSink.java
index d43690e5b2..1ab6f398c5 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcAppendTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeBucketSink.java
@@ -16,40 +16,36 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.cdc;
+package org.apache.paimon.flink.sink;
-import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.flink.sink.FlinkWriteSink;
-import org.apache.paimon.flink.sink.StoreSinkWrite;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import javax.annotation.Nullable;
-/**
- * CDC Sink for unaware bucket table. It should not add compaction node,
because the compaction may
- * have old schema.
- */
-public class CdcAppendTableSink extends FlinkWriteSink<CdcRecord> {
+import java.util.Map;
+
+/** {@link FlinkSink} for writing records into fixed bucket Paimon table. */
+public class PostponeBucketSink extends FlinkWriteSink<InternalRow> {
- private final Integer parallelism;
+ private static final long serialVersionUID = 1L;
- public CdcAppendTableSink(FileStoreTable table, Integer parallelism) {
- super(table, null);
- this.parallelism = parallelism;
+ public PostponeBucketSink(
+ FileStoreTable table, @Nullable Map<String, String>
overwritePartition) {
+ super(table, overwritePartition);
}
@Override
- protected OneInputStreamOperatorFactory<CdcRecord, Committable>
createWriteOperatorFactory(
+ protected OneInputStreamOperatorFactory<InternalRow, Committable>
createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
- return new CdcAppendTableWriteOperator.Factory(table, writeProvider,
commitUser);
+ return createNoStateRowWriteOperatorFactory(table, null,
writeProvider, commitUser);
}
@Override
- public DataStream<Committable> doWrite(
- DataStream<CdcRecord> input, String initialCommitUser, @Nullable
Integer parallelism) {
- return super.doWrite(input, initialCommitUser, this.parallelism);
+ protected CommittableStateManager<ManifestCommittable>
createCommittableStateManager() {
+ return createRestoreOnlyCommittableStateManager(table);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
index a9b0922d0b..8556cb6e77 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
@@ -19,18 +19,9 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.data.serializer.VersionedSerializer;
-import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.utils.SerializableSupplier;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
-
-import java.util.ArrayList;
import java.util.List;
/**
@@ -44,52 +35,20 @@ import java.util.List;
* store writers.
*/
public class RestoreAndFailCommittableStateManager<GlobalCommitT>
- implements CommittableStateManager<GlobalCommitT> {
+ extends RestoreCommittableStateManager<GlobalCommitT> {
private static final long serialVersionUID = 1L;
- /** The committable's serializer. */
- private final SerializableSupplier<VersionedSerializer<GlobalCommitT>>
committableSerializer;
-
- private final boolean partitionMarkDoneRecoverFromState;
-
- /** GlobalCommitT state of this job. Used to filter out previous
successful commits. */
- private ListState<GlobalCommitT> streamingCommitterState;
-
- public RestoreAndFailCommittableStateManager(
- SerializableSupplier<VersionedSerializer<GlobalCommitT>>
committableSerializer) {
- this(committableSerializer, true);
- }
-
public RestoreAndFailCommittableStateManager(
SerializableSupplier<VersionedSerializer<GlobalCommitT>>
committableSerializer,
boolean partitionMarkDoneRecoverFromState) {
- this.committableSerializer = committableSerializer;
- this.partitionMarkDoneRecoverFromState =
partitionMarkDoneRecoverFromState;
+ super(committableSerializer, partitionMarkDoneRecoverFromState);
}
@Override
- public void initializeState(
- StateInitializationContext context, Committer<?, GlobalCommitT>
committer)
+ protected int recover(List<GlobalCommitT> committables, Committer<?,
GlobalCommitT> committer)
throws Exception {
- streamingCommitterState =
- new SimpleVersionedListState<>(
- context.getOperatorStateStore()
- .getListState(
- new ListStateDescriptor<>(
-
"streaming_committer_raw_states",
-
BytePrimitiveArraySerializer.INSTANCE)),
- new
VersionedSerializerWrapper<>(committableSerializer.get()));
- List<GlobalCommitT> restored = new ArrayList<>();
- streamingCommitterState.get().forEach(restored::add);
- streamingCommitterState.clear();
- recover(restored, committer);
- }
-
- private void recover(List<GlobalCommitT> committables, Committer<?,
GlobalCommitT> committer)
- throws Exception {
- int numCommitted =
- committer.filterAndCommit(committables, true,
partitionMarkDoneRecoverFromState);
+ int numCommitted = super.recover(committables, committer);
if (numCommitted > 0) {
throw new RuntimeException(
"This exception is intentionally thrown "
@@ -97,11 +56,6 @@ public class
RestoreAndFailCommittableStateManager<GlobalCommitT>
+ "By restarting the job we hope that "
+ "writers can start writing based on these new
commits.");
}
- }
-
- @Override
- public void snapshotState(StateSnapshotContext context,
List<GlobalCommitT> committables)
- throws Exception {
- streamingCommitterState.update(committables);
+ return numCommitted;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java
similarity index 75%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java
index a9b0922d0b..b1ed396bdc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreCommittableStateManager.java
@@ -36,14 +36,9 @@ import java.util.List;
/**
* A {@link CommittableStateManager} which stores uncommitted {@link
ManifestCommittable}s in state.
*
- * <p>When the job restarts, these {@link ManifestCommittable}s will be
restored and committed, then
- * an intended failure will occur, hoping that after the job restarts, all
writers can start writing
- * based on the restored snapshot.
- *
- * <p>Useful for committing snapshots containing records. For example
snapshots produced by table
- * store writers.
+ * <p>When the job restarts, these {@link ManifestCommittable}s will be
restored and committed.
*/
-public class RestoreAndFailCommittableStateManager<GlobalCommitT>
+public class RestoreCommittableStateManager<GlobalCommitT>
implements CommittableStateManager<GlobalCommitT> {
private static final long serialVersionUID = 1L;
@@ -56,12 +51,7 @@ public class
RestoreAndFailCommittableStateManager<GlobalCommitT>
/** GlobalCommitT state of this job. Used to filter out previous
successful commits. */
private ListState<GlobalCommitT> streamingCommitterState;
- public RestoreAndFailCommittableStateManager(
- SerializableSupplier<VersionedSerializer<GlobalCommitT>>
committableSerializer) {
- this(committableSerializer, true);
- }
-
- public RestoreAndFailCommittableStateManager(
+ public RestoreCommittableStateManager(
SerializableSupplier<VersionedSerializer<GlobalCommitT>>
committableSerializer,
boolean partitionMarkDoneRecoverFromState) {
this.committableSerializer = committableSerializer;
@@ -86,17 +76,9 @@ public class
RestoreAndFailCommittableStateManager<GlobalCommitT>
recover(restored, committer);
}
- private void recover(List<GlobalCommitT> committables, Committer<?,
GlobalCommitT> committer)
+ protected int recover(List<GlobalCommitT> committables, Committer<?,
GlobalCommitT> committer)
throws Exception {
- int numCommitted =
- committer.filterAndCommit(committables, true,
partitionMarkDoneRecoverFromState);
- if (numCommitted > 0) {
- throw new RuntimeException(
- "This exception is intentionally thrown "
- + "after committing the restored checkpoints. "
- + "By restarting the job we hope that "
- + "writers can start writing based on these new
commits.");
- }
+ return committer.filterAndCommit(committables, true,
partitionMarkDoneRecoverFromState);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
index 69a339a411..a58839a841 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowAppendTableSink.java
@@ -19,18 +19,18 @@
package org.apache.paimon.flink.sink;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import java.util.Map;
/** An {@link AppendTableSink} which handles {@link InternalRow}. */
public class RowAppendTableSink extends AppendTableSink<InternalRow> {
+ private static final long serialVersionUID = 1L;
+
public RowAppendTableSink(
FileStoreTable table,
Map<String, String> overwritePartitions,
@@ -42,34 +42,12 @@ public class RowAppendTableSink extends
AppendTableSink<InternalRow> {
@Override
protected OneInputStreamOperatorFactory<InternalRow, Committable>
createWriteOperatorFactory(
StoreSinkWrite.Provider writeProvider, String commitUser) {
- return new RowDataStoreWriteOperator.Factory(
- table, logSinkFunction, writeProvider, commitUser) {
- @Override
- @SuppressWarnings("unchecked, rawtypes")
- public StreamOperator
createStreamOperator(StreamOperatorParameters parameters) {
- return new RowDataStoreWriteOperator(
- parameters, table, logSinkFunction, writeProvider,
commitUser) {
-
- @Override
- protected StoreSinkWriteState createState(
- int subtaskId,
- StateInitializationContext context,
- StoreSinkWriteState.StateValueFilter stateFilter)
- throws Exception {
- // No conflicts will occur in append only unaware
bucket writer, so no state
- // is needed.
- return new NoopStoreSinkWriteState(subtaskId,
stateFilter);
- }
+ return createNoStateRowWriteOperatorFactory(
+ table, logSinkFunction, writeProvider, commitUser);
+ }
- @Override
- protected String getCommitUser(StateInitializationContext
context)
- throws Exception {
- // No conflicts will occur in append only unaware
bucket writer, so
- // commitUser does not matter.
- return commitUser;
- }
- };
- }
- };
+ @Override
+ protected CommittableStateManager<ManifestCommittable>
createCommittableStateManager() {
+ return createRestoreOnlyCommittableStateManager(table);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
index 32ee2f42af..1bc68477a3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java
@@ -67,7 +67,7 @@ public class BatchWriteGeneratorTagOperatorTest extends
CommitterOperatorTest {
table,
initialCommitUser,
new RestoreAndFailCommittableStateManager<>(
- ManifestCommittableSerializer::new));
+ ManifestCommittableSerializer::new, true));
OneInputStreamOperator<Committable, Committable> committerOperator =
committerOperatorFactory.createStreamOperator(
@@ -143,7 +143,7 @@ public class BatchWriteGeneratorTagOperatorTest extends
CommitterOperatorTest {
table,
initialCommitUser,
new RestoreAndFailCommittableStateManager<>(
- ManifestCommittableSerializer::new));
+ ManifestCommittableSerializer::new, true));
OneInputStreamOperator<Committable, Committable> committerOperator =
committerOperatorFactory.createStreamOperator(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 4ad1dff9aa..220af9a73b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -665,7 +665,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
table,
null,
new RestoreAndFailCommittableStateManager<>(
- ManifestCommittableSerializer::new));
+ ManifestCommittableSerializer::new, true));
OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
createTestHarness(operatorFactory);
testHarness.open();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index a61a379bde..041a692d9e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -652,7 +652,7 @@ class StoreMultiCommitterTest {
initialCommitUser,
context -> new StoreMultiCommitter(catalogLoader,
context),
new RestoreAndFailCommittableStateManager<>(
- WrappedManifestCommittableSerializer::new));
+ WrappedManifestCommittableSerializer::new,
true));
return createTestHarness(operator);
}