This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 406ab33d4 [flink] Remove state from append only unaware bucket writer 
(#4219)
406ab33d4 is described below

commit 406ab33d48c6c328732a74c38a27049e8d967045
Author: tsreaper <[email protected]>
AuthorDate: Tue Sep 24 09:32:24 2024 +0800

    [flink] Remove state from append only unaware bucket writer (#4219)
---
 .../org/apache/paimon/AppendOnlyFileStore.java     |  45 ++++++---
 .../paimon/operation/AbstractFileStoreWrite.java   |  70 ++++++++------
 .../paimon/operation/AppendOnlyFileStoreWrite.java | 105 +++++----------------
 .../AppendOnlyFixedBucketFileStoreWrite.java       | 104 ++++++++++++++++++++
 .../AppendOnlyUnawareBucketFileStoreWrite.java     |  89 +++++++++++++++++
 .../paimon/operation/KeyValueFileStoreWrite.java   |  10 +-
 .../paimon/operation/MemoryFileStoreWrite.java     |   2 -
 .../sink/cdc/CdcRecordStoreMultiWriteOperator.java |   3 +-
 .../sink/MultiTablesStoreCompactOperator.java      |   6 +-
 .../paimon/flink/sink/NoopStoreSinkWriteState.java |  54 +++++++++++
 .../paimon/flink/sink/RowUnawareBucketSink.java    |  21 ++++-
 .../paimon/flink/sink/StoreCompactOperator.java    |  45 ++++-----
 .../apache/paimon/flink/sink/StoreSinkWrite.java   |   8 ++
 .../paimon/flink/sink/StoreSinkWriteState.java     |  97 ++-----------------
 ...riteState.java => StoreSinkWriteStateImpl.java} |  72 ++++----------
 .../paimon/flink/sink/TableWriteOperator.java      |  44 ++++-----
 .../flink/UnawareBucketAppendOnlyTableITCase.java  |  99 +++++++++++++++++++
 .../apache/paimon/flink/sink/FlinkSinkTest.java    |  30 +++---
 .../flink/sink/StoreCompactOperatorTest.java       |  50 ++++------
 19 files changed, 583 insertions(+), 371 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 289f0bde7..b0e3ea5f4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -25,6 +25,8 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
+import org.apache.paimon.operation.AppendOnlyFixedBucketFileStoreWrite;
+import org.apache.paimon.operation.AppendOnlyUnawareBucketFileStoreWrite;
 import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.operation.ScanBucketFilter;
 import org.apache.paimon.predicate.Predicate;
@@ -94,21 +96,36 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
     @Override
     public AppendOnlyFileStoreWrite newWrite(
             String commitUser, ManifestCacheFilter manifestFilter) {
-        return new AppendOnlyFileStoreWrite(
-                fileIO,
-                newRead(),
-                schema.id(),
-                commitUser,
-                rowType,
-                pathFactory(),
-                snapshotManager(),
-                newScan(true).withManifestCacheFilter(manifestFilter),
-                options,
-                bucketMode(),
+        DeletionVectorsMaintainer.Factory dvMaintainerFactory =
                 options.deletionVectorsEnabled()
                         ? 
DeletionVectorsMaintainer.factory(newIndexFileHandler())
-                        : null,
-                tableName);
+                        : null;
+        if (bucketMode() == BucketMode.BUCKET_UNAWARE) {
+            return new AppendOnlyUnawareBucketFileStoreWrite(
+                    fileIO,
+                    newRead(),
+                    schema.id(),
+                    rowType,
+                    pathFactory(),
+                    snapshotManager(),
+                    newScan(true).withManifestCacheFilter(manifestFilter),
+                    options,
+                    dvMaintainerFactory,
+                    tableName);
+        } else {
+            return new AppendOnlyFixedBucketFileStoreWrite(
+                    fileIO,
+                    newRead(),
+                    schema.id(),
+                    commitUser,
+                    rowType,
+                    pathFactory(),
+                    snapshotManager(),
+                    newScan(true).withManifestCacheFilter(manifestFilter),
+                    options,
+                    dvMaintainerFactory,
+                    tableName);
+        }
     }
 
     private AppendOnlyFileStoreScan newScan(boolean forWrite) {
@@ -129,7 +146,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                                         splitAnd(predicate),
                                         rowType.getFieldNames(),
                                         bucketKeyType.getFieldNames());
-                        if (bucketFilters.size() > 0) {
+                        if (!bucketFilters.isEmpty()) {
                             setBucketKeyFilter(and(bucketFilters));
                         }
                     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 195a0b48d..2e502e96f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -52,6 +52,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Function;
 
 import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
 
@@ -64,7 +65,6 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractFileStoreWrite.class);
 
-    private final String commitUser;
     protected final SnapshotManager snapshotManager;
     private final FileStoreScan scan;
     private final int writerNumberMax;
@@ -85,14 +85,12 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     private boolean isInsertOnly;
 
     protected AbstractFileStoreWrite(
-            String commitUser,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             @Nullable IndexMaintainer.Factory<T> indexFactory,
             @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
             String tableName,
             int writerNumberMax) {
-        this.commitUser = commitUser;
         this.snapshotManager = snapshotManager;
         this.scan = scan;
         this.indexFactory = indexFactory;
@@ -169,7 +167,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     @Override
     public List<CommitMessage> prepareCommit(boolean waitCompaction, long 
commitIdentifier)
             throws Exception {
-        long latestCommittedIdentifier;
+        Function<WriterContainer<T>, Boolean> writerCleanChecker;
         if (writers.values().stream()
                         .map(Map::values)
                         .flatMap(Collection::stream)
@@ -177,20 +175,10 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
                         .max()
                         .orElse(Long.MIN_VALUE)
                 == Long.MIN_VALUE) {
-            // Optimization for the first commit.
-            //
-            // If this is the first commit, no writer has previous modified 
commit, so the value of
-            // `latestCommittedIdentifier` does not matter.
-            //
-            // Without this optimization, we may need to scan through all 
snapshots only to find
-            // that there is no previous snapshot by this user, which is very 
inefficient.
-            latestCommittedIdentifier = Long.MIN_VALUE;
+            // If this is the first commit, no writer should be cleaned.
+            writerCleanChecker = writerContainer -> false;
         } else {
-            latestCommittedIdentifier =
-                    snapshotManager
-                            .latestSnapshotOfUser(commitUser)
-                            .map(Snapshot::commitIdentifier)
-                            .orElse(Long.MIN_VALUE);
+            writerCleanChecker = createWriterCleanChecker();
         }
 
         List<CommitMessage> result = new ArrayList<>();
@@ -226,14 +214,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                 result.add(committable);
 
                 if (committable.isEmpty()) {
-                    // Condition 1: There is no more record waiting to be 
committed. Note that the
-                    // condition is < (instead of <=), because each commit 
identifier may have
-                    // multiple snapshots. We must make sure all snapshots of 
this identifier are
-                    // committed.
-                    // Condition 2: No compaction is in progress. That is, no 
more changelog will be
-                    // produced.
-                    if (writerContainer.lastModifiedCommitIdentifier < 
latestCommittedIdentifier
-                            && !writerContainer.writer.isCompacting()) {
+                    if (writerCleanChecker.apply(writerContainer)) {
                         // Clear writer if no update, and if its latest 
modification has committed.
                         //
                         // We need a mechanism to clear writers, otherwise 
there will be more and
@@ -242,12 +223,10 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
                             LOG.debug(
                                     "Closing writer for partition {}, bucket 
{}. "
                                             + "Writer's last modified 
identifier is {}, "
-                                            + "while latest committed 
identifier is {}, "
-                                            + "current commit identifier is 
{}.",
+                                            + "while current commit identifier 
is {}.",
                                     partition,
                                     bucket,
                                     
writerContainer.lastModifiedCommitIdentifier,
-                                    latestCommittedIdentifier,
                                     commitIdentifier);
                         }
                         writerContainer.writer.close();
@@ -266,6 +245,41 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
         return result;
     }
 
+    // This abstract function returns a whole function (instead of just a 
boolean value),
+    // because we do not want to introduce `commitUser` into this base class.
+    //
+    // For writers with no conflicts, `commitUser` might be some random value.
+    protected abstract Function<WriterContainer<T>, Boolean> 
createWriterCleanChecker();
+
+    protected static <T>
+            Function<WriterContainer<T>, Boolean> 
createConflictAwareWriterCleanChecker(
+                    String commitUser, SnapshotManager snapshotManager) {
+        long latestCommittedIdentifier =
+                snapshotManager
+                        .latestSnapshotOfUser(commitUser)
+                        .map(Snapshot::commitIdentifier)
+                        .orElse(Long.MIN_VALUE);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Latest committed identifier is {}", 
latestCommittedIdentifier);
+        }
+
+        // Condition 1: There is no more record waiting to be committed. Note 
that the
+        // condition is < (instead of <=), because each commit identifier may 
have
+        // multiple snapshots. We must make sure all snapshots of this 
identifier are
+        // committed.
+        //
+        // Condition 2: No compaction is in progress. That is, no more 
changelog will be
+        // produced.
+        return writerContainer ->
+                writerContainer.lastModifiedCommitIdentifier < 
latestCommittedIdentifier
+                        && !writerContainer.writer.isCompacting();
+    }
+
+    protected static <T>
+            Function<WriterContainer<T>, Boolean> 
createNoConflictAwareWriterCleanChecker() {
+        return writerContainer -> true;
+    }
+
     @Override
     public void close() throws Exception {
         for (Map<Integer, WriterContainer<T>> bucketWriters : 
writers.values()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 203a9ff35..40fe5dbfa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -21,10 +21,7 @@ package org.apache.paimon.operation;
 import org.apache.paimon.AppendOnlyFileStore;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.AppendOnlyWriter;
-import org.apache.paimon.append.BucketedAppendCompactManager;
 import org.apache.paimon.compact.CompactManager;
-import org.apache.paimon.compact.NoopCompactManager;
-import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVector;
@@ -36,10 +33,8 @@ import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.RowDataRollingFileWriter;
 import org.apache.paimon.manifest.FileSource;
-import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.ExceptionUtils;
@@ -64,7 +59,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 
 /** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
-public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow>
+public abstract class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
         implements BundleFileStoreWriter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AppendOnlyFileStoreWrite.class);
@@ -75,59 +70,30 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
     private final RowType rowType;
     private final FileFormat fileFormat;
     private final FileStorePathFactory pathFactory;
-    private final long targetFileSize;
-    private final int compactionMinFileNum;
-    private final int compactionMaxFileNum;
-    private final boolean commitForceCompact;
-    private final String fileCompression;
-    private final CompressOptions spillCompression;
-    private final boolean useWriteBuffer;
-    private final boolean spillable;
-    private final MemorySize maxDiskSize;
+
     private final SimpleColStatsCollector.Factory[] statsCollectors;
     private final FileIndexOptions fileIndexOptions;
-    private final BucketMode bucketMode;
     private boolean forceBufferSpill = false;
-    private final boolean skipCompaction;
 
     public AppendOnlyFileStoreWrite(
             FileIO fileIO,
             RawFileSplitRead read,
             long schemaId,
-            String commitUser,
             RowType rowType,
             FileStorePathFactory pathFactory,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options,
-            BucketMode bucketMode,
             @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
             String tableName) {
-        super(commitUser, snapshotManager, scan, options, null, 
dvMaintainerFactory, tableName);
+        super(snapshotManager, scan, options, null, dvMaintainerFactory, 
tableName);
         this.fileIO = fileIO;
         this.read = read;
         this.schemaId = schemaId;
         this.rowType = rowType;
         this.fileFormat = options.fileFormat();
         this.pathFactory = pathFactory;
-        this.bucketMode = bucketMode;
-        this.targetFileSize = options.targetFileSize(false);
-        this.compactionMinFileNum = options.compactionMinFileNum();
-        this.compactionMaxFileNum = options.compactionMaxFileNum().orElse(5);
-        this.commitForceCompact = options.commitForceCompact();
-        // AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act 
difference in
-        // unaware-bucket mode (no compaction and force empty-writer).
-        if (bucketMode == BucketMode.BUCKET_UNAWARE) {
-            super.withIgnorePreviousFiles(true);
-            this.skipCompaction = true;
-        } else {
-            this.skipCompaction = options.writeOnly();
-        }
-        this.fileCompression = options.fileCompression();
-        this.spillCompression = options.spillCompressOptions();
-        this.useWriteBuffer = options.useWriteBufferForAppend();
-        this.spillable = options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode);
-        this.maxDiskSize = options.writeBufferSpillDiskSize();
+
         this.statsCollectors =
                 StatsCollectorFactories.createStatsFactories(options, 
rowType.getFieldNames());
         this.fileIndexOptions = options.indexColumnsOptions();
@@ -143,50 +109,38 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
             @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor,
             @Nullable DeletionVectorsMaintainer dvMaintainer) {
-        CompactManager compactManager = new NoopCompactManager();
-        if (!skipCompaction) {
-            Function<String, DeletionVector> dvFactory =
-                    dvMaintainer != null
-                            ? f -> 
dvMaintainer.deletionVectorOf(f).orElse(null)
-                            : null;
-            compactManager =
-                    new BucketedAppendCompactManager(
-                            compactExecutor,
-                            restoredFiles,
-                            dvMaintainer,
-                            compactionMinFileNum,
-                            compactionMaxFileNum,
-                            targetFileSize,
-                            files -> compactRewrite(partition, bucket, 
dvFactory, files),
-                            compactionMetrics == null
-                                    ? null
-                                    : 
compactionMetrics.createReporter(partition, bucket));
-        }
-
         return new AppendOnlyWriter(
                 fileIO,
                 ioManager,
                 schemaId,
                 fileFormat,
-                targetFileSize,
+                options.targetFileSize(false),
                 rowType,
                 restoredMaxSeqNumber,
-                compactManager,
+                getCompactManager(partition, bucket, restoredFiles, 
compactExecutor, dvMaintainer),
                 // it is only for new files, no dv
                 files -> createFilesIterator(partition, bucket, files, null),
-                commitForceCompact,
+                options.commitForceCompact(),
                 pathFactory.createDataFilePathFactory(partition, bucket),
                 restoreIncrement,
-                useWriteBuffer || forceBufferSpill,
-                spillable || forceBufferSpill,
-                fileCompression,
-                spillCompression,
+                options.useWriteBufferForAppend() || forceBufferSpill,
+                options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode)
+                        || forceBufferSpill,
+                options.fileCompression(),
+                options.spillCompressOptions(),
                 statsCollectors,
-                maxDiskSize,
+                options.writeBufferSpillDiskSize(),
                 fileIndexOptions,
                 options.asyncFileWrite());
     }
 
+    protected abstract CompactManager getCompactManager(
+            BinaryRow partition,
+            int bucket,
+            List<DataFileMeta> restoredFiles,
+            ExecutorService compactExecutor,
+            @Nullable DeletionVectorsMaintainer dvMaintainer);
+
     public List<DataFileMeta> compactRewrite(
             BinaryRow partition,
             int bucket,
@@ -199,10 +153,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
         Exception collectedExceptions = null;
         RowDataRollingFileWriter rewriter =
                 createRollingFileWriter(
-                        partition,
-                        bucket,
-                        new LongCounter(toCompact.get(0).minSequenceNumber()),
-                        FileSource.COMPACT);
+                        partition, bucket, new 
LongCounter(toCompact.get(0).minSequenceNumber()));
         List<IOExceptionSupplier<DeletionVector>> dvFactories = null;
         if (dvFactory != null) {
             dvFactories = new ArrayList<>(toCompact.size());
@@ -228,19 +179,19 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
     }
 
     private RowDataRollingFileWriter createRollingFileWriter(
-            BinaryRow partition, int bucket, LongCounter seqNumCounter, 
FileSource fileSource) {
+            BinaryRow partition, int bucket, LongCounter seqNumCounter) {
         return new RowDataRollingFileWriter(
                 fileIO,
                 schemaId,
                 fileFormat,
-                targetFileSize,
+                options.targetFileSize(false),
                 rowType,
                 pathFactory.createDataFilePathFactory(partition, bucket),
                 seqNumCounter,
-                fileCompression,
+                options.fileCompression(),
                 statsCollectors,
                 fileIndexOptions,
-                fileSource,
+                FileSource.COMPACT,
                 options.asyncFileWrite());
     }
 
@@ -253,12 +204,6 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
         return new RecordReaderIterator<>(read.createReader(partition, bucket, 
files, dvFactories));
     }
 
-    @Override
-    public void withIgnorePreviousFiles(boolean ignorePrevious) {
-        // in unaware bucket mode, we need all writers to be empty
-        super.withIgnorePreviousFiles(ignorePrevious || bucketMode == 
BucketMode.BUCKET_UNAWARE);
-    }
-
     @Override
     protected void forceBufferSpill() throws Exception {
         if (ioManager == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java
new file mode 100644
index 000000000..e169c0165
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.BucketedAppendCompactManager;
+import org.apache.paimon.compact.CompactManager;
+import org.apache.paimon.compact.NoopCompactManager;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+/** {@link AppendOnlyFileStoreWrite} for {@link 
org.apache.paimon.table.BucketMode#HASH_FIXED}. */
+public class AppendOnlyFixedBucketFileStoreWrite extends 
AppendOnlyFileStoreWrite {
+
+    private final String commitUser;
+
+    public AppendOnlyFixedBucketFileStoreWrite(
+            FileIO fileIO,
+            RawFileSplitRead read,
+            long schemaId,
+            String commitUser,
+            RowType rowType,
+            FileStorePathFactory pathFactory,
+            SnapshotManager snapshotManager,
+            FileStoreScan scan,
+            CoreOptions options,
+            @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+            String tableName) {
+        super(
+                fileIO,
+                read,
+                schemaId,
+                rowType,
+                pathFactory,
+                snapshotManager,
+                scan,
+                options,
+                dvMaintainerFactory,
+                tableName);
+        this.commitUser = commitUser;
+    }
+
+    @Override
+    protected CompactManager getCompactManager(
+            BinaryRow partition,
+            int bucket,
+            List<DataFileMeta> restoredFiles,
+            ExecutorService compactExecutor,
+            @Nullable DeletionVectorsMaintainer dvMaintainer) {
+        if (options.writeOnly()) {
+            return new NoopCompactManager();
+        } else {
+            Function<String, DeletionVector> dvFactory =
+                    dvMaintainer != null
+                            ? f -> 
dvMaintainer.deletionVectorOf(f).orElse(null)
+                            : null;
+            return new BucketedAppendCompactManager(
+                    compactExecutor,
+                    restoredFiles,
+                    dvMaintainer,
+                    options.compactionMinFileNum(),
+                    options.compactionMaxFileNum().orElse(5),
+                    options.targetFileSize(false),
+                    files -> compactRewrite(partition, bucket, dvFactory, 
files),
+                    compactionMetrics == null
+                            ? null
+                            : compactionMetrics.createReporter(partition, 
bucket));
+        }
+    }
+
+    @Override
+    protected Function<WriterContainer<InternalRow>, Boolean> 
createWriterCleanChecker() {
+        return createConflictAwareWriterCleanChecker(commitUser, 
snapshotManager);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java
new file mode 100644
index 000000000..f33b207bb
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compact.CompactManager;
+import org.apache.paimon.compact.NoopCompactManager;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
+
+/**
+ * {@link AppendOnlyFileStoreWrite} for {@link 
org.apache.paimon.table.BucketMode#BUCKET_UNAWARE}.
+ */
+public class AppendOnlyUnawareBucketFileStoreWrite extends 
AppendOnlyFileStoreWrite {
+
+    public AppendOnlyUnawareBucketFileStoreWrite(
+            FileIO fileIO,
+            RawFileSplitRead read,
+            long schemaId,
+            RowType rowType,
+            FileStorePathFactory pathFactory,
+            SnapshotManager snapshotManager,
+            FileStoreScan scan,
+            CoreOptions options,
+            @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+            String tableName) {
+        super(
+                fileIO,
+                read,
+                schemaId,
+                rowType,
+                pathFactory,
+                snapshotManager,
+                scan,
+                options,
+                dvMaintainerFactory,
+                tableName);
+        super.withIgnorePreviousFiles(true);
+    }
+
+    @Override
+    protected CompactManager getCompactManager(
+            BinaryRow partition,
+            int bucket,
+            List<DataFileMeta> restoredFiles,
+            ExecutorService compactExecutor,
+            @Nullable DeletionVectorsMaintainer dvMaintainer) {
+        return new NoopCompactManager();
+    }
+
+    @Override
+    public void withIgnorePreviousFiles(boolean ignorePrevious) {
+        // in unaware bucket mode, we need all writers to be empty
+        super.withIgnorePreviousFiles(true);
+    }
+
+    @Override
+    protected Function<WriterContainer<InternalRow>, Boolean> 
createWriterCleanChecker() {
+        return createNoConflictAwareWriterCleanChecker();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index b62cdabae..66819255b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -83,6 +83,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
@@ -106,6 +107,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
     private final RowType keyType;
     private final RowType valueType;
     private final RowType partitionType;
+    private final String commitUser;
     @Nullable private final RecordLevelExpire recordLevelExpire;
     @Nullable private Cache<String, LookupFile> lookupFileCache;
 
@@ -131,7 +133,6 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             KeyValueFieldsExtractor extractor,
             String tableName) {
         super(
-                commitUser,
                 snapshotManager,
                 scan,
                 options,
@@ -142,6 +143,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         this.partitionType = partitionType;
         this.keyType = keyType;
         this.valueType = valueType;
+        this.commitUser = commitUser;
+
         this.udsComparatorSupplier = udsComparatorSupplier;
         this.readerFactoryBuilder =
                 KeyValueFileReaderFactory.builder(
@@ -389,6 +392,11 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 lookupFileCache);
     }
 
+    @Override
+    protected Function<WriterContainer<KeyValue>, Boolean> 
createWriterCleanChecker() {
+        return createConflictAwareWriterCleanChecker(commitUser, 
snapshotManager);
+    }
+
     @Override
     public void close() throws Exception {
         super.close();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 58f7174bf..386a72cf5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -59,7 +59,6 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
     private WriterBufferMetric writerBufferMetric;
 
     public MemoryFileStoreWrite(
-            String commitUser,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options,
@@ -67,7 +66,6 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
             @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
             String tableName) {
         super(
-                commitUser,
                 snapshotManager,
                 scan,
                 indexFactory,
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index a604b3045..7d72fe3e8 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -28,6 +28,7 @@ import org.apache.paimon.flink.sink.StateUtils;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
 import org.apache.paimon.flink.sink.StoreSinkWriteState;
+import org.apache.paimon.flink.sink.StoreSinkWriteStateImpl;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.options.Options;
@@ -98,7 +99,7 @@ public class CdcRecordStoreMultiWriteOperator
                         context, "commit_user_state", String.class, 
initialCommitUser);
 
         // TODO: should use CdcRecordMultiChannelComputer to filter
-        state = new StoreSinkWriteState(context, (tableName, partition, 
bucket) -> true);
+        state = new StoreSinkWriteStateImpl(context, (tableName, partition, 
bucket) -> true);
         tables = new HashMap<>();
         writes = new HashMap<>();
         compactExecutor =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 513b694fb..7cb5d30c2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -95,15 +95,15 @@ public class MultiTablesStoreCompactOperator
 
         catalog = catalogLoader.load();
 
-        // Each job can only have one user name and this name must be 
consistent across restarts.
-        // We cannot use job id as commit user name here because user may 
change job id by creating
+        // Each job can only have one username and this name must be 
consistent across restarts.
+        // We cannot use job id as commit username here because user may 
change job id by creating
         // a savepoint, stop the job and then resume from savepoint.
         commitUser =
                 StateUtils.getSingleValueFromState(
                         context, "commit_user_state", String.class, 
initialCommitUser);
 
         state =
-                new StoreSinkWriteState(
+                new StoreSinkWriteStateImpl(
                         context,
                         (tableName, partition, bucket) ->
                                 ChannelComputer.select(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
new file mode 100644
index 000000000..f975bce6e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/NoopStoreSinkWriteState.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * A {@link StoreSinkWriteState} which stores nothing. Currently only used for 
append only unaware
+ * bucket table writers.
+ */
+public class NoopStoreSinkWriteState implements StoreSinkWriteState {
+
+    private final StateValueFilter stateValueFilter;
+
+    public NoopStoreSinkWriteState(StateValueFilter stateValueFilter) {
+        this.stateValueFilter = stateValueFilter;
+    }
+
+    @Override
+    public StateValueFilter stateValueFilter() {
+        return stateValueFilter;
+    }
+
+    @Override
+    public @Nullable List<StateValue> get(String tableName, String key) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void put(String tableName, String key, List<StateValue> 
stateValues) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void snapshotState() throws Exception {}
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
index b670b905d..1cd10390c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.table.FileStoreTable;
 
+import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 
 import java.util.Map;
@@ -39,6 +40,24 @@ public class RowUnawareBucketSink extends 
UnawareBucketSink<InternalRow> {
     @Override
     protected OneInputStreamOperator<InternalRow, Committable> 
createWriteOperator(
             StoreSinkWrite.Provider writeProvider, String commitUser) {
-        return new RowDataStoreWriteOperator(table, logSinkFunction, 
writeProvider, commitUser);
+        return new RowDataStoreWriteOperator(table, logSinkFunction, 
writeProvider, commitUser) {
+
+            @Override
+            protected StoreSinkWriteState createState(
+                    StateInitializationContext context,
+                    StoreSinkWriteState.StateValueFilter stateFilter)
+                    throws Exception {
+                // No conflicts will occur in append only unaware bucket 
writer, so no state is
+                // needed.
+                return new NoopStoreSinkWriteState(stateFilter);
+            }
+
+            @Override
+            protected String getCommitUser(StateInitializationContext context) 
throws Exception {
+                // No conflicts will occur in append only unaware bucket 
writer, so commitUser does
+                // not matter.
+                return commitUser;
+            }
+        };
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 184288490..bc7bb350d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMetaSerializer;
@@ -29,7 +28,6 @@ import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -77,39 +75,30 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
 
-        // Each job can only have one user name and this name must be 
consistent across restarts.
-        // We cannot use job id as commit user name here because user may 
change job id by creating
+        // Each job can only have one username and this name must be 
consistent across restarts.
+        // We cannot use job id as commit username here because user may 
change job id by creating
         // a savepoint, stop the job and then resume from savepoint.
         String commitUser =
                 StateUtils.getSingleValueFromState(
                         context, "commit_user_state", String.class, 
initialCommitUser);
 
-        initStateAndWriter(
-                context,
-                (tableName, partition, bucket) ->
-                        ChannelComputer.select(
-                                        partition,
-                                        bucket,
-                                        
getRuntimeContext().getNumberOfParallelSubtasks())
-                                == getRuntimeContext().getIndexOfThisSubtask(),
-                getContainingTask().getEnvironment().getIOManager(),
-                commitUser);
-    }
-
-    @VisibleForTesting
-    void initStateAndWriter(
-            StateInitializationContext context,
-            StoreSinkWriteState.StateValueFilter stateFilter,
-            IOManager ioManager,
-            String commitUser)
-            throws Exception {
-        // We put state and write init in this method for convenient testing. 
Without construct a
-        // runtime context, we can test to construct a writer here
-        state = new StoreSinkWriteState(context, stateFilter);
-
+        state =
+                new StoreSinkWriteStateImpl(
+                        context,
+                        (tableName, partition, bucket) ->
+                                ChannelComputer.select(
+                                                partition,
+                                                bucket,
+                                                
getRuntimeContext().getNumberOfParallelSubtasks())
+                                        == 
getRuntimeContext().getIndexOfThisSubtask());
         write =
                 storeSinkWriteProvider.provide(
-                        table, commitUser, state, ioManager, memoryPool, 
getMetricGroup());
+                        table,
+                        commitUser,
+                        state,
+                        getContainingTask().getEnvironment().getIOManager(),
+                        memoryPool,
+                        getMetricGroup());
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index a432a5545..3a5a8df5c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -84,6 +84,10 @@ public interface StoreSinkWrite {
     @FunctionalInterface
     interface Provider extends Serializable {
 
+        /**
+         * TODO: The argument list has become too complicated. Build {@link 
TableWriteImpl} directly
+         * in caller and simplify the argument list.
+         */
         StoreSinkWrite provide(
                 FileStoreTable table,
                 String commitUser,
@@ -97,6 +101,10 @@ public interface StoreSinkWrite {
     @FunctionalInterface
     interface WithWriteBufferProvider extends Serializable {
 
+        /**
+         * TODO: The argument list has become too complicated. Build {@link 
TableWriteImpl} directly
+         * in caller and simplify the argument list.
+         */
         StoreSinkWrite provide(
                 FileStoreTable table,
                 String commitUser,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
index 072d6a1b9..8626a01a6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
@@ -19,107 +19,31 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.utils.SerializationUtils;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.runtime.state.StateInitializationContext;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
- * States for {@link StoreSinkWrite}s.
- *
- * <p>States are positioned first by table name and then by key name. This 
class should be initiated
- * in a sink operator and then given to {@link StoreSinkWrite}.
+ * States for {@link StoreSinkWrite}s. It's a wrapper to conveniently modify 
states for each table
+ * stored in Flink states.
  */
-public class StoreSinkWriteState {
-
-    private final StateValueFilter stateValueFilter;
-
-    private final ListState<Tuple5<String, String, byte[], Integer, byte[]>> 
listState;
-    private final Map<String, Map<String, List<StateValue>>> map;
-
-    @SuppressWarnings("unchecked")
-    public StoreSinkWriteState(
-            StateInitializationContext context, StateValueFilter 
stateValueFilter)
-            throws Exception {
-        this.stateValueFilter = stateValueFilter;
-        TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>> 
listStateSerializer =
-                new TupleSerializer<>(
-                        (Class<Tuple5<String, String, byte[], Integer, 
byte[]>>)
-                                (Class<?>) Tuple5.class,
-                        new TypeSerializer[] {
-                            StringSerializer.INSTANCE,
-                            StringSerializer.INSTANCE,
-                            BytePrimitiveArraySerializer.INSTANCE,
-                            IntSerializer.INSTANCE,
-                            BytePrimitiveArraySerializer.INSTANCE
-                        });
-        listState =
-                context.getOperatorStateStore()
-                        .getUnionListState(
-                                new ListStateDescriptor<>(
-                                        "paimon_store_sink_write_state", 
listStateSerializer));
-
-        map = new HashMap<>();
-        for (Tuple5<String, String, byte[], Integer, byte[]> tuple : 
listState.get()) {
-            BinaryRow partition = 
SerializationUtils.deserializeBinaryRow(tuple.f2);
-            if (stateValueFilter.filter(tuple.f0, partition, tuple.f3)) {
-                map.computeIfAbsent(tuple.f0, k -> new HashMap<>())
-                        .computeIfAbsent(tuple.f1, k -> new ArrayList<>())
-                        .add(new StateValue(partition, tuple.f3, tuple.f4));
-            }
-        }
-    }
+public interface StoreSinkWriteState {
 
-    public StateValueFilter stateValueFilter() {
-        return stateValueFilter;
-    }
+    StoreSinkWriteState.StateValueFilter stateValueFilter();
 
-    public @Nullable List<StateValue> get(String tableName, String key) {
-        Map<String, List<StateValue>> innerMap = map.get(tableName);
-        return innerMap == null ? null : innerMap.get(key);
-    }
+    @Nullable
+    List<StoreSinkWriteState.StateValue> get(String tableName, String key);
 
-    public void put(String tableName, String key, List<StateValue> 
stateValues) {
-        map.computeIfAbsent(tableName, k -> new HashMap<>()).put(key, 
stateValues);
-    }
+    void put(String tableName, String key, 
List<StoreSinkWriteState.StateValue> stateValues);
 
-    public void snapshotState() throws Exception {
-        List<Tuple5<String, String, byte[], Integer, byte[]>> list = new 
ArrayList<>();
-        for (Map.Entry<String, Map<String, List<StateValue>>> tables : 
map.entrySet()) {
-            for (Map.Entry<String, List<StateValue>> entry : 
tables.getValue().entrySet()) {
-                for (StateValue stateValue : entry.getValue()) {
-                    list.add(
-                            Tuple5.of(
-                                    tables.getKey(),
-                                    entry.getKey(),
-                                    
SerializationUtils.serializeBinaryRow(stateValue.partition()),
-                                    stateValue.bucket(),
-                                    stateValue.value()));
-                }
-            }
-        }
-        listState.update(list);
-    }
+    void snapshotState() throws Exception;
 
     /**
      * A state value for {@link StoreSinkWrite}. All state values should be 
given a partition and a
      * bucket so that they can be redistributed once the sink parallelism is 
changed.
      */
-    public static class StateValue {
+    class StateValue {
 
         private final BinaryRow partition;
         private final int bucket;
@@ -148,8 +72,7 @@ public class StoreSinkWriteState {
      * Given the table name, partition and bucket of a {@link StateValue} in a 
union list state,
      * decide whether to keep this {@link StateValue} in this subtask.
      */
-    public interface StateValueFilter {
-
+    interface StateValueFilter {
         boolean filter(String tableName, BinaryRow partition, int bucket);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
similarity index 67%
copy from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
copy to 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
index 072d6a1b9..a01cbcb68 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteStateImpl.java
@@ -39,21 +39,22 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * States for {@link StoreSinkWrite}s.
+ * Default implementation for {@link StoreSinkWriteState}.
  *
  * <p>States are positioned first by table name and then by key name. This 
class should be initiated
  * in a sink operator and then given to {@link StoreSinkWrite}.
  */
-public class StoreSinkWriteState {
+public class StoreSinkWriteStateImpl implements StoreSinkWriteState {
 
-    private final StateValueFilter stateValueFilter;
+    private final StoreSinkWriteState.StateValueFilter stateValueFilter;
 
     private final ListState<Tuple5<String, String, byte[], Integer, byte[]>> 
listState;
-    private final Map<String, Map<String, List<StateValue>>> map;
+    private final Map<String, Map<String, 
List<StoreSinkWriteState.StateValue>>> map;
 
     @SuppressWarnings("unchecked")
-    public StoreSinkWriteState(
-            StateInitializationContext context, StateValueFilter 
stateValueFilter)
+    public StoreSinkWriteStateImpl(
+            StateInitializationContext context,
+            StoreSinkWriteState.StateValueFilter stateValueFilter)
             throws Exception {
         this.stateValueFilter = stateValueFilter;
         TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>> 
listStateSerializer =
@@ -79,29 +80,34 @@ public class StoreSinkWriteState {
             if (stateValueFilter.filter(tuple.f0, partition, tuple.f3)) {
                 map.computeIfAbsent(tuple.f0, k -> new HashMap<>())
                         .computeIfAbsent(tuple.f1, k -> new ArrayList<>())
-                        .add(new StateValue(partition, tuple.f3, tuple.f4));
+                        .add(new StoreSinkWriteState.StateValue(partition, 
tuple.f3, tuple.f4));
             }
         }
     }
 
-    public StateValueFilter stateValueFilter() {
+    @Override
+    public StoreSinkWriteState.StateValueFilter stateValueFilter() {
         return stateValueFilter;
     }
 
-    public @Nullable List<StateValue> get(String tableName, String key) {
-        Map<String, List<StateValue>> innerMap = map.get(tableName);
+    @Override
+    public @Nullable List<StoreSinkWriteState.StateValue> get(String 
tableName, String key) {
+        Map<String, List<StoreSinkWriteState.StateValue>> innerMap = 
map.get(tableName);
         return innerMap == null ? null : innerMap.get(key);
     }
 
-    public void put(String tableName, String key, List<StateValue> 
stateValues) {
+    public void put(
+            String tableName, String key, List<StoreSinkWriteState.StateValue> 
stateValues) {
         map.computeIfAbsent(tableName, k -> new HashMap<>()).put(key, 
stateValues);
     }
 
     public void snapshotState() throws Exception {
         List<Tuple5<String, String, byte[], Integer, byte[]>> list = new 
ArrayList<>();
-        for (Map.Entry<String, Map<String, List<StateValue>>> tables : 
map.entrySet()) {
-            for (Map.Entry<String, List<StateValue>> entry : 
tables.getValue().entrySet()) {
-                for (StateValue stateValue : entry.getValue()) {
+        for (Map.Entry<String, Map<String, 
List<StoreSinkWriteState.StateValue>>> tables :
+                map.entrySet()) {
+            for (Map.Entry<String, List<StoreSinkWriteState.StateValue>> entry 
:
+                    tables.getValue().entrySet()) {
+                for (StoreSinkWriteState.StateValue stateValue : 
entry.getValue()) {
                     list.add(
                             Tuple5.of(
                                     tables.getKey(),
@@ -114,42 +120,4 @@ public class StoreSinkWriteState {
         }
         listState.update(list);
     }
-
-    /**
-     * A state value for {@link StoreSinkWrite}. All state values should be 
given a partition and a
-     * bucket so that they can be redistributed once the sink parallelism is 
changed.
-     */
-    public static class StateValue {
-
-        private final BinaryRow partition;
-        private final int bucket;
-        private final byte[] value;
-
-        public StateValue(BinaryRow partition, int bucket, byte[] value) {
-            this.partition = partition;
-            this.bucket = bucket;
-            this.value = value;
-        }
-
-        public BinaryRow partition() {
-            return partition;
-        }
-
-        public int bucket() {
-            return bucket;
-        }
-
-        public byte[] value() {
-            return value;
-        }
-    }
-
-    /**
-     * Given the table name, partition and bucket of a {@link StateValue} in a 
union list state,
-     * decide whether to keep this {@link StateValue} in this subtask.
-     */
-    public interface StateValueFilter {
-
-        boolean filter(String tableName, BinaryRow partition, int bucket);
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index f38e0ad6b..f0f5e3bc9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -24,7 +24,6 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.ChannelComputer;
 
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 
@@ -56,13 +55,6 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
 
-        // Each job can only have one user name and this name must be 
consistent across restarts.
-        // We cannot use job id as commit user name here because user may 
change job id by creating
-        // a savepoint, stop the job and then resume from savepoint.
-        String commitUser =
-                StateUtils.getSingleValueFromState(
-                        context, "commit_user_state", String.class, 
initialCommitUser);
-
         boolean containLogSystem = containLogSystem();
         int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
         StateValueFilter stateFilter =
@@ -74,27 +66,29 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
                     return task == getRuntimeContext().getIndexOfThisSubtask();
                 };
 
-        initStateAndWriter(
-                context,
-                stateFilter,
-                getContainingTask().getEnvironment().getIOManager(),
-                commitUser);
+        state = createState(context, stateFilter);
+        write =
+                storeSinkWriteProvider.provide(
+                        table,
+                        getCommitUser(context),
+                        state,
+                        getContainingTask().getEnvironment().getIOManager(),
+                        memoryPool,
+                        getMetricGroup());
     }
 
-    @VisibleForTesting
-    void initStateAndWriter(
-            StateInitializationContext context,
-            StateValueFilter stateFilter,
-            IOManager ioManager,
-            String commitUser)
+    protected StoreSinkWriteState createState(
+            StateInitializationContext context, 
StoreSinkWriteState.StateValueFilter stateFilter)
             throws Exception {
-        // We put state and write init in this method for convenient testing. 
Without construct a
-        // runtime context, we can test to construct a writer here
-        state = new StoreSinkWriteState(context, stateFilter);
+        return new StoreSinkWriteStateImpl(context, stateFilter);
+    }
 
-        write =
-                storeSinkWriteProvider.provide(
-                        table, commitUser, state, ioManager, memoryPool, 
getMetricGroup());
+    protected String getCommitUser(StateInitializationContext context) throws 
Exception {
+        // Each job can only have one username and this name must be 
consistent across restarts.
+        // We cannot use job id as commit username here because user may 
change job id by creating
+        // a savepoint, stop the job and then resume from savepoint.
+        return StateUtils.getSingleValueFromState(
+                context, "commit_user_state", String.class, initialCommitUser);
     }
 
     protected abstract boolean containLogSystem();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
index b14f1c041..cb323542d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/UnawareBucketAppendOnlyTableITCase.java
@@ -19,14 +19,25 @@
 package org.apache.paimon.flink;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.utils.FailingFileIO;
 
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.File;
 import java.time.Duration;
@@ -346,6 +357,94 @@ public class UnawareBucketAppendOnlyTableITCase extends 
CatalogITCaseBase {
                 .containsExactlyInAnyOrder(Row.of(2, "c", "BBB"), Row.of(3, 
"c", "BBB"));
     }
 
+    @Timeout(60)
+    @Test
+    public void testStatelessWriter() throws Exception {
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(), new Path(path, 
"default.db/append_table"));
+
+        StreamExecutionEnvironment env =
+                streamExecutionEnvironmentBuilder()
+                        .streamingMode()
+                        .parallelism(2)
+                        .checkpointIntervalMs(500)
+                        .build();
+        DataStream<Integer> source =
+                env.addSource(new 
TestStatelessWriterSource(table)).setParallelism(2).forward();
+
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        tEnv.registerCatalog("mycat", sEnv.getCatalog("PAIMON").get());
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.createTemporaryView("S", tEnv.fromDataStream(source).as("id"));
+
+        tEnv.executeSql("INSERT INTO append_table SELECT id, 'test' FROM 
S").await();
+        assertThat(batchSql("SELECT * FROM append_table"))
+                .containsExactlyInAnyOrder(Row.of(1, "test"), Row.of(2, 
"test"));
+    }
+
+    private static class TestStatelessWriterSource extends 
RichParallelSourceFunction<Integer> {
+
+        private final FileStoreTable table;
+
+        private volatile boolean isRunning = true;
+
+        private TestStatelessWriterSource(FileStoreTable table) {
+            this.table = table;
+        }
+
+        @Override
+        public void run(SourceContext<Integer> sourceContext) throws Exception 
{
+            int taskId = getRuntimeContext().getIndexOfThisSubtask();
+            // wait some time in parallelism #2,
+            // so that it does not commit in the same checkpoint with 
parallelism #1
+            int waitCount = (taskId == 0 ? 0 : 10);
+
+            while (isRunning) {
+                synchronized (sourceContext.getCheckpointLock()) {
+                    if (taskId == 0) {
+                        if (waitCount == 0) {
+                            sourceContext.collect(1);
+                        } else if (countNumRecords() >= 1) {
+                            // wait for the record to commit before exiting
+                            break;
+                        }
+                    } else {
+                        int numRecords = countNumRecords();
+                        if (numRecords >= 1) {
+                            if (waitCount == 0) {
+                                sourceContext.collect(2);
+                            } else if (countNumRecords() >= 2) {
+                                // make sure the next checkpoint is successful
+                                break;
+                            }
+                        }
+                    }
+                    waitCount--;
+                }
+                Thread.sleep(1000);
+            }
+        }
+
+        private int countNumRecords() throws Exception {
+            int ret = 0;
+            RecordReader<InternalRow> reader =
+                    
table.newRead().createReader(table.newSnapshotReader().read());
+            try (RecordReaderIterator<InternalRow> it = new 
RecordReaderIterator<>(reader)) {
+                while (it.hasNext()) {
+                    it.next();
+                    ret++;
+                }
+            }
+            return ret;
+        }
+
+        @Override
+        public void cancel() {
+            isRunning = false;
+        }
+    }
+
     @Override
     protected List<String> ddl() {
         return Arrays.asList(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
index 84246d00a..c33556834 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -36,17 +36,15 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
-import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -90,20 +88,14 @@ public class FlinkSinkTest {
                                         ((OneInputTransformation) 
written.getTransformation())
                                                 .getOperatorFactory())
                                 .getOperator());
-        StateInitializationContextImpl context =
-                new StateInitializationContextImpl(
-                        null,
-                        new MockOperatorStateStore() {
-                            @Override
-                            public <S> ListState<S> getUnionListState(
-                                    ListStateDescriptor<S> stateDescriptor) 
throws Exception {
-                                return getListState(stateDescriptor);
-                            }
-                        },
-                        null,
-                        null,
-                        null);
-        operator.initStateAndWriter(context, (a, b, c) -> true, new 
IOManagerAsync(), "123");
+
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+                new OneInputStreamOperatorTestHarness<>(operator);
+        harness.setup(serializer);
+        harness.initializeEmptyState();
+
         return ((KeyValueFileStoreWrite) ((StoreSinkWriteImpl) 
operator.write).write.getWrite())
                 .bufferSpillable();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index eb7756718..3f2daedff 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -27,12 +27,10 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.TableTestBase;
 import org.apache.paimon.table.sink.SinkRecord;
 
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.state.StateInitializationContextImpl;
-import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -50,36 +48,28 @@ public class StoreCompactOperatorTest extends TableTestBase 
{
 
         CompactRememberStoreWrite compactRememberStoreWrite =
                 new CompactRememberStoreWrite(streamingMode);
-        StoreCompactOperator storeCompactOperator =
+        StoreCompactOperator operator =
                 new StoreCompactOperator(
-                        (FileStoreTable) getTableDefault(),
+                        getTableDefault(),
                         (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
                                 compactRememberStoreWrite,
                         "10086");
-        storeCompactOperator.open();
-        StateInitializationContextImpl context =
-                new StateInitializationContextImpl(
-                        null,
-                        new MockOperatorStateStore() {
-                            @Override
-                            public <S> ListState<S> getUnionListState(
-                                    ListStateDescriptor<S> stateDescriptor) 
throws Exception {
-                                return getListState(stateDescriptor);
-                            }
-                        },
-                        null,
-                        null,
-                        null);
-        storeCompactOperator.initStateAndWriter(
-                context, (a, b, c) -> true, new IOManagerAsync(), "123");
-
-        storeCompactOperator.processElement(new StreamRecord<>(data(0)));
-        storeCompactOperator.processElement(new StreamRecord<>(data(0)));
-        storeCompactOperator.processElement(new StreamRecord<>(data(1)));
-        storeCompactOperator.processElement(new StreamRecord<>(data(1)));
-        storeCompactOperator.processElement(new StreamRecord<>(data(2)));
-        storeCompactOperator.prepareCommit(true, 1);
 
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        OneInputStreamOperatorTestHarness<RowData, Committable> harness =
+                new OneInputStreamOperatorTestHarness<>(operator);
+        harness.setup(serializer);
+        harness.initializeEmptyState();
+        harness.open();
+
+        harness.processElement(new StreamRecord<>(data(0)));
+        harness.processElement(new StreamRecord<>(data(0)));
+        harness.processElement(new StreamRecord<>(data(1)));
+        harness.processElement(new StreamRecord<>(data(1)));
+        harness.processElement(new StreamRecord<>(data(2)));
+
+        operator.prepareCommit(true, 1);
         
Assertions.assertThat(compactRememberStoreWrite.compactTime).isEqualTo(3);
     }
 

Reply via email to