This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 484a22711 [flink] set spillable default on for batch mode (#1400)
484a22711 is described below

commit 484a2271157706d11e4c52ef09069492a9a2e118
Author: YeJunHao <[email protected]>
AuthorDate: Sun Jun 25 18:02:45 2023 +0800

    [flink] set spillable default on for batch mode (#1400)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |   5 +-
 .../paimon/operation/AbstractFileStoreWrite.java   |   6 +
 .../apache/paimon/operation/FileStoreWrite.java    |   7 ++
 .../paimon/operation/KeyValueFileStoreWrite.java   |   6 +-
 .../apache/paimon/table/sink/InnerTableWrite.java  |   3 +
 .../apache/paimon/table/sink/TableWriteImpl.java   |  11 ++
 .../apache/paimon/flink/sink/CompactorSink.java    |   4 +-
 .../apache/paimon/flink/sink/FileStoreSink.java    |   2 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  11 +-
 .../flink/sink/GlobalFullCompactionSinkWrite.java  |  11 +-
 .../paimon/flink/sink/RowDynamicBucketSink.java    |   2 +-
 .../paimon/flink/sink/StoreCompactOperator.java    |   5 +-
 .../apache/paimon/flink/sink/StoreSinkWrite.java   |   2 +
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  11 +-
 .../paimon/flink/sink/TableWriteOperator.java      |  27 +++--
 .../flink/sink/UnawareBucketCompactionSink.java    |   2 +-
 .../flink/sink/cdc/CdcDynamicBucketSink.java       |   2 +-
 .../apache/paimon/flink/sink/cdc/FlinkCdcSink.java |   2 +-
 .../apache/paimon/flink/sink/FlinkSinkTest.java    | 133 +++++++++++++++++++++
 .../cdc/CdcRecordStoreMultiWriteOperatorTest.java  |   9 +-
 .../sink/cdc/CdcRecordStoreWriteOperatorTest.java  |   9 +-
 21 files changed, 240 insertions(+), 30 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 265eb9bd3..aafbd2c72 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -797,8 +797,9 @@ public class CoreOptions implements Serializable {
         return options.get(WRITE_BUFFER_SIZE).getBytes();
     }
 
-    public boolean writeBufferSpillable(boolean usingObjectStore) {
-        return 
options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore);
+    public boolean writeBufferSpillable(boolean usingObjectStore, boolean 
isStreaming) {
+        // if not streaming mode, we turn spillable on by default.
+        return 
options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || 
!isStreaming);
     }
 
     public Duration continuousDiscoveryInterval() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index c66a1d1ae..23443f3ad 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -71,6 +71,7 @@ public abstract class AbstractFileStoreWrite<T>
 
     private ExecutorService lazyCompactExecutor;
     private boolean ignorePreviousFiles = false;
+    protected boolean isStreamingMode = false;
 
     protected AbstractFileStoreWrite(
             String commitUser,
@@ -328,6 +329,11 @@ public abstract class AbstractFileStoreWrite<T>
         return new WriterContainer<>(writer, indexMaintainer, 
latestSnapshotId);
     }
 
+    @Override
+    public void isStreamingMode(boolean isStreamingMode) {
+        this.isStreamingMode = isStreamingMode;
+    }
+
     private List<DataFileMeta> scanExistingFileMetas(
             long snapshotId, BinaryRow partition, int bucket) {
         List<DataFileMeta> existingFileMetas = new ArrayList<>();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index d7cd64250..05009d59f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -98,6 +98,13 @@ public interface FileStoreWrite<T> {
     List<CommitMessage> prepareCommit(boolean waitCompaction, long 
commitIdentifier)
             throws Exception;
 
+    /**
+     * We detect whether it is in batch mode, if so, we do some optimization.
+     *
+     * @param isStreamingMode whether in streaming mode
+     */
+    void isStreamingMode(boolean isStreamingMode);
+
     /**
      * Close the writer.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 86abcc947..1cb311633 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compact.NoopCompactManager;
@@ -174,8 +175,9 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 restoreIncrement);
     }
 
-    private boolean bufferSpillable() {
-        return options.writeBufferSpillable(fileIO.isObjectStore());
+    @VisibleForTesting
+    public boolean bufferSpillable() {
+        return options.writeBufferSpillable(fileIO.isObjectStore(), 
isStreamingMode);
     }
 
     private CompactManager createCompactManager(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
index 69eb61f97..d0231a224 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
@@ -22,4 +22,7 @@ package org.apache.paimon.table.sink;
 public interface InnerTableWrite extends StreamTableWrite, BatchTableWrite {
 
     InnerTableWrite withIgnorePreviousFiles(boolean ignorePreviousFiles);
+
+    // we detect whether in streaming mode, and do some optimization
+    InnerTableWrite isStreamingMode(boolean isStreamingMode);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 1ee7d5640..0cfba0560 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -62,6 +62,12 @@ public class TableWriteImpl<T>
         return this;
     }
 
+    @Override
+    public TableWriteImpl<T> isStreamingMode(boolean isStreamingMode) {
+        write.isStreamingMode(isStreamingMode);
+        return this;
+    }
+
     @Override
     public TableWriteImpl<T> withIOManager(IOManager ioManager) {
         write.withIOManager(ioManager);
@@ -167,6 +173,11 @@ public class TableWriteImpl<T>
         write.restore(state);
     }
 
+    @VisibleForTesting
+    public AbstractFileStoreWrite<T> getWrite() {
+        return write;
+    }
+
     /** Extractor to extract {@link T} from the {@link SinkRecord}. */
     public interface RecordExtractor<T> {
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index b5e168df3..b4389717c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -36,8 +36,8 @@ public class CompactorSink extends FlinkSink<RowData> {
 
     @Override
     protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
-        return new StoreCompactOperator(table, writeProvider, isStreaming, 
commitUser);
+            StoreSinkWrite.Provider writeProvider, String commitUser) {
+        return new StoreCompactOperator(table, writeProvider, commitUser);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
index 6ef487df7..6b40815ed 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
@@ -44,7 +44,7 @@ public class FileStoreSink extends FlinkWriteSink<RowData> {
 
     @Override
     protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
+            StoreSinkWrite.Provider writeProvider, String commitUser) {
         return new RowDataStoreWriteOperator(table, logSinkFunction, 
writeProvider, commitUser);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 1f402ca8f..292661f1d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -66,7 +66,8 @@ public abstract class FlinkSink<T> implements Serializable {
         this.ignorePreviousFiles = ignorePreviousFiles;
     }
 
-    private StoreSinkWrite.Provider createWriteProvider(CheckpointConfig 
checkpointConfig) {
+    private StoreSinkWrite.Provider createWriteProvider(
+            CheckpointConfig checkpointConfig, boolean isStreaming) {
         boolean waitCompaction;
         if (table.coreOptions().writeOnly()) {
             waitCompaction = false;
@@ -100,6 +101,7 @@ public abstract class FlinkSink<T> implements Serializable {
                                 ignorePreviousFiles,
                                 waitCompaction,
                                 finalDeltaCommits,
+                                isStreaming,
                                 memoryPool);
             }
         }
@@ -112,6 +114,7 @@ public abstract class FlinkSink<T> implements Serializable {
                         ioManager,
                         ignorePreviousFiles,
                         waitCompaction,
+                        isStreaming,
                         memoryPool);
     }
 
@@ -148,8 +151,8 @@ public abstract class FlinkSink<T> implements Serializable {
                                 createWriteOperator(
                                         createWriteProvider(
                                                 input.getExecutionEnvironment()
-                                                        
.getCheckpointConfig()),
-                                        isStreaming,
+                                                        .getCheckpointConfig(),
+                                                isStreaming),
                                         commitUser))
                         .setParallelism(parallelism == null ? 
input.getParallelism() : parallelism);
         Options options = Options.fromMap(table.options());
@@ -202,7 +205,7 @@ public abstract class FlinkSink<T> implements Serializable {
     }
 
     protected abstract OneInputStreamOperator<T, Committable> 
createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser);
+            StoreSinkWrite.Provider writeProvider, String commitUser);
 
     protected abstract SerializableFunction<String, Committer<Committable, 
ManifestCommittable>>
             createCommitterFactory(boolean streamingCheckpointEnabled);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index fd568ae05..847153ae0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -72,8 +72,17 @@ public class GlobalFullCompactionSinkWrite extends 
StoreSinkWriteImpl {
             boolean ignorePreviousFiles,
             boolean waitCompaction,
             int deltaCommits,
+            boolean isStreaming,
             @Nullable MemorySegmentPool memoryPool) {
-        super(table, commitUser, state, ioManager, ignorePreviousFiles, 
waitCompaction, memoryPool);
+        super(
+                table,
+                commitUser,
+                state,
+                ioManager,
+                ignorePreviousFiles,
+                waitCompaction,
+                isStreaming,
+                memoryPool);
 
         this.deltaCommits = deltaCommits;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
index dc9bcb869..a7775b8f9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
@@ -59,7 +59,7 @@ public class RowDynamicBucketSink extends 
DynamicBucketSink<RowData> {
 
     @Override
     protected OneInputStreamOperator<Tuple2<RowData, Integer>, Committable> 
createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
+            StoreSinkWrite.Provider writeProvider, String commitUser) {
         return new DynamicBucketRowWriteOperator(table, writeProvider, 
commitUser);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 975c7af68..daa851286 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -47,7 +47,6 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
 
     private final FileStoreTable table;
     private final StoreSinkWrite.Provider storeSinkWriteProvider;
-    private final boolean isStreaming;
     private final String initialCommitUser;
 
     private transient StoreSinkWriteState state;
@@ -57,7 +56,6 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
     public StoreCompactOperator(
             FileStoreTable table,
             StoreSinkWrite.Provider storeSinkWriteProvider,
-            boolean isStreaming,
             String initialCommitUser) {
         super(Options.fromMap(table.options()));
         Preconditions.checkArgument(
@@ -65,7 +63,6 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
                 CoreOptions.WRITE_ONLY.key() + " should not be true for 
StoreCompactOperator.");
         this.table = table;
         this.storeSinkWriteProvider = storeSinkWriteProvider;
-        this.isStreaming = isStreaming;
         this.initialCommitUser = initialCommitUser;
     }
 
@@ -115,7 +112,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
         byte[] serializedFiles = record.getBinary(3);
         List<DataFileMeta> files = 
dataFileMetaSerializer.deserializeList(serializedFiles);
 
-        if (isStreaming) {
+        if (write.streamingMode()) {
             write.notifyNewFiles(snapshotId, partition, bucket, files);
             write.compact(partition, bucket, false);
         } else {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index e183ed154..3f44ec609 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -49,6 +49,8 @@ public interface StoreSinkWrite {
 
     void snapshotState() throws Exception;
 
+    boolean streamingMode();
+
     void close() throws Exception;
 
     /**
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 3b376d707..66a556c88 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -50,6 +50,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
     private final IOManager ioManager;
     private final boolean ignorePreviousFiles;
     private final boolean waitCompaction;
+    private final boolean isStreamingMode;
     @Nullable private final MemorySegmentPool memoryPool;
 
     protected TableWriteImpl<?> write;
@@ -61,12 +62,14 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             IOManager ioManager,
             boolean ignorePreviousFiles,
             boolean waitCompaction,
+            boolean isStreamingMode,
             @Nullable MemorySegmentPool memoryPool) {
         this.commitUser = commitUser;
         this.state = state;
         this.ioManager = ioManager;
         this.ignorePreviousFiles = ignorePreviousFiles;
         this.waitCompaction = waitCompaction;
+        this.isStreamingMode = isStreamingMode;
         this.memoryPool = memoryPool;
         this.write = newTableWrite(table);
     }
@@ -83,7 +86,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                                 : new HeapMemorySegmentPool(
                                         table.coreOptions().writeBufferSize(),
                                         table.coreOptions().pageSize()))
-                .withIgnorePreviousFiles(ignorePreviousFiles);
+                .withIgnorePreviousFiles(ignorePreviousFiles)
+                .isStreamingMode(isStreamingMode);
     }
 
     @Override
@@ -138,6 +142,11 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
         // do nothing
     }
 
+    @Override
+    public boolean streamingMode() {
+        return isStreamingMode;
+    }
+
     @Override
     public void close() throws Exception {
         if (write != null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index b7ed60f68..914576195 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -18,10 +18,12 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.flink.sink.StoreSinkWriteState.StateValueFilter;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 
@@ -70,15 +72,26 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
                                     : ChannelComputer.select(partition, 
bucket, numTasks);
                     return task == getRuntimeContext().getIndexOfThisSubtask();
                 };
+
+        initStateAndWriter(
+                context,
+                stateFilter,
+                getContainingTask().getEnvironment().getIOManager(),
+                commitUser);
+    }
+
+    @VisibleForTesting
+    void initStateAndWriter(
+            StateInitializationContext context,
+            StateValueFilter stateFilter,
+            IOManager ioManager,
+            String commitUser)
+            throws Exception {
+        // We put state and write init in this method for convenient testing. 
Without construct a
+        // runtime context, we can test to construct a writer here
         state = new StoreSinkWriteState(context, stateFilter);
 
-        write =
-                storeSinkWriteProvider.provide(
-                        table,
-                        commitUser,
-                        state,
-                        getContainingTask().getEnvironment().getIOManager(),
-                        memoryPool);
+        write = storeSinkWriteProvider.provide(table, commitUser, state, 
ioManager, memoryPool);
     }
 
     protected abstract boolean containLogSystem();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
index 1aec4e711..3c96fb03a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
@@ -44,7 +44,7 @@ public class UnawareBucketCompactionSink extends 
FlinkSink<AppendOnlyCompactionT
 
     @Override
     protected OneInputStreamOperator<AppendOnlyCompactionTask, Committable> 
createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
+            StoreSinkWrite.Provider writeProvider, String commitUser) {
         return new AppendOnlyTableCompactionWorkerOperator(table, commitUser);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
index 0b2820709..ecde6094c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
@@ -73,7 +73,7 @@ public class CdcDynamicBucketSink extends 
DynamicBucketSink<CdcRecord> {
 
     @Override
     protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable> 
createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
+            StoreSinkWrite.Provider writeProvider, String commitUser) {
         return new CdcDynamicBucketWriteOperator(table, writeProvider, 
commitUser);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
index 9f603fe97..870bf5361 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
@@ -39,7 +39,7 @@ public class FlinkCdcSink extends FlinkWriteSink<CdcRecord> {
 
     @Override
     protected OneInputStreamOperator<CdcRecord, Committable> 
createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
+            StoreSinkWrite.Provider writeProvider, String commitUser) {
         return new CdcRecordStoreWriteOperator(table, writeProvider, 
commitUser);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
new file mode 100644
index 000000000..cf954b340
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.operation.KeyValueFileStoreWrite;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+
+/** Test class for {@link FlinkSink}. */
+public class FlinkSinkTest {
+
+    @TempDir Path tempPath;
+
+    @Test
+    public void testOptimizeKeyValueWriterForBatch() throws Exception {
+        // test for batch mode auto enable spillable
+        FileStoreTable fileStoreTable = createFileStoreTable();
+        StreamExecutionEnvironment streamExecutionEnvironment =
+                StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // set this when batch executing
+        streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        Assertions.assertTrue(testSpillable(streamExecutionEnvironment, 
fileStoreTable));
+
+        // set this to streaming, we should get a false then
+        
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        Assertions.assertFalse(testSpillable(streamExecutionEnvironment, 
fileStoreTable));
+    }
+
+    private boolean testSpillable(
+            StreamExecutionEnvironment streamExecutionEnvironment, 
FileStoreTable fileStoreTable)
+            throws Exception {
+        DataStreamSource<RowData> source =
+                streamExecutionEnvironment.fromCollection(
+                        Collections.singletonList(new 
FlinkRowData(GenericRow.of(1, 1))));
+        FlinkSink<RowData> flinkSink =
+                new FileStoreSink(fileStoreTable, Lock.emptyFactory(), null, 
null);
+        SingleOutputStreamOperator<Committable> written = 
flinkSink.doWrite(source, "123", 1);
+        RowDataStoreWriteOperator operator =
+                ((RowDataStoreWriteOperator)
+                        ((SimpleOperatorFactory)
+                                        ((OneInputTransformation) 
written.getTransformation())
+                                                .getOperatorFactory())
+                                .getOperator());
+        StateInitializationContextImpl context =
+                new StateInitializationContextImpl(
+                        null,
+                        new MockOperatorStateStore() {
+                            @Override
+                            public <S> ListState<S> getUnionListState(
+                                    ListStateDescriptor<S> stateDescriptor) 
throws Exception {
+                                return getListState(stateDescriptor);
+                            }
+                        },
+                        null,
+                        null,
+                        null);
+        operator.initStateAndWriter(context, (a, b, c) -> true, new 
IOManagerAsync(), "123");
+        return ((KeyValueFileStoreWrite) ((StoreSinkWriteImpl) 
operator.write).write.getWrite())
+                .bufferSpillable();
+    }
+
+    protected static final RowType ROW_TYPE =
+            RowType.of(
+                    new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"pk", "pt0"});
+
+    private FileStoreTable createFileStoreTable() throws Exception {
+        org.apache.paimon.fs.Path tablePath = new 
org.apache.paimon.fs.Path(tempPath.toString());
+        Options conf = new Options();
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                ROW_TYPE.getFields(),
+                                Collections.emptyList(),
+                                Arrays.asList("pk"),
+                                conf.toMap(),
+                                ""));
+        return FileStoreTableFactory.create(
+                FileIOFinder.find(tablePath), tablePath, tableSchema, conf);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 37aa35a15..a01ab7fe3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -647,7 +647,14 @@ public class CdcRecordStoreMultiWriteOperatorTest {
                         catalogLoader,
                         (t, commitUser, state, ioManager, memoryPool) ->
                                 new StoreSinkWriteImpl(
-                                        t, commitUser, state, ioManager, 
false, false, memoryPool),
+                                        t,
+                                        commitUser,
+                                        state,
+                                        ioManager,
+                                        false,
+                                        false,
+                                        true,
+                                        memoryPool),
                         commitUser,
                         Options.fromMap(new HashMap<>()));
         TypeSerializer<CdcMultiplexRecord> inputSerializer = new 
JavaSerializer<>();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index f7f865a15..0f2161f58 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -257,7 +257,14 @@ public class CdcRecordStoreWriteOperatorTest {
                         table,
                         (t, commitUser, state, ioManager, memoryPool) ->
                                 new StoreSinkWriteImpl(
-                                        t, commitUser, state, ioManager, 
false, false, memoryPool),
+                                        t,
+                                        commitUser,
+                                        state,
+                                        ioManager,
+                                        false,
+                                        false,
+                                        true,
+                                        memoryPool),
                         commitUser);
         TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>();
         TypeSerializer<Committable> outputSerializer =

Reply via email to