This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 484a22711 [flink] set spillable default on for batch mode (#1400)
484a22711 is described below
commit 484a2271157706d11e4c52ef09069492a9a2e118
Author: YeJunHao <[email protected]>
AuthorDate: Sun Jun 25 18:02:45 2023 +0800
[flink] set spillable default on for batch mode (#1400)
---
.../main/java/org/apache/paimon/CoreOptions.java | 5 +-
.../paimon/operation/AbstractFileStoreWrite.java | 6 +
.../apache/paimon/operation/FileStoreWrite.java | 7 ++
.../paimon/operation/KeyValueFileStoreWrite.java | 6 +-
.../apache/paimon/table/sink/InnerTableWrite.java | 3 +
.../apache/paimon/table/sink/TableWriteImpl.java | 11 ++
.../apache/paimon/flink/sink/CompactorSink.java | 4 +-
.../apache/paimon/flink/sink/FileStoreSink.java | 2 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 11 +-
.../flink/sink/GlobalFullCompactionSinkWrite.java | 11 +-
.../paimon/flink/sink/RowDynamicBucketSink.java | 2 +-
.../paimon/flink/sink/StoreCompactOperator.java | 5 +-
.../apache/paimon/flink/sink/StoreSinkWrite.java | 2 +
.../paimon/flink/sink/StoreSinkWriteImpl.java | 11 +-
.../paimon/flink/sink/TableWriteOperator.java | 27 +++--
.../flink/sink/UnawareBucketCompactionSink.java | 2 +-
.../flink/sink/cdc/CdcDynamicBucketSink.java | 2 +-
.../apache/paimon/flink/sink/cdc/FlinkCdcSink.java | 2 +-
.../apache/paimon/flink/sink/FlinkSinkTest.java | 133 +++++++++++++++++++++
.../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 9 +-
.../sink/cdc/CdcRecordStoreWriteOperatorTest.java | 9 +-
21 files changed, 240 insertions(+), 30 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 265eb9bd3..aafbd2c72 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -797,8 +797,9 @@ public class CoreOptions implements Serializable {
return options.get(WRITE_BUFFER_SIZE).getBytes();
}
- public boolean writeBufferSpillable(boolean usingObjectStore) {
- return
options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore);
+ public boolean writeBufferSpillable(boolean usingObjectStore, boolean
isStreaming) {
+ // if not streaming mode, we turn spillable on by default.
+ return
options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore ||
!isStreaming);
}
public Duration continuousDiscoveryInterval() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index c66a1d1ae..23443f3ad 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -71,6 +71,7 @@ public abstract class AbstractFileStoreWrite<T>
private ExecutorService lazyCompactExecutor;
private boolean ignorePreviousFiles = false;
+ protected boolean isStreamingMode = false;
protected AbstractFileStoreWrite(
String commitUser,
@@ -328,6 +329,11 @@ public abstract class AbstractFileStoreWrite<T>
return new WriterContainer<>(writer, indexMaintainer,
latestSnapshotId);
}
+ @Override
+ public void isStreamingMode(boolean isStreamingMode) {
+ this.isStreamingMode = isStreamingMode;
+ }
+
private List<DataFileMeta> scanExistingFileMetas(
long snapshotId, BinaryRow partition, int bucket) {
List<DataFileMeta> existingFileMetas = new ArrayList<>();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index d7cd64250..05009d59f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -98,6 +98,13 @@ public interface FileStoreWrite<T> {
List<CommitMessage> prepareCommit(boolean waitCompaction, long
commitIdentifier)
throws Exception;
+ /**
+ * We detect whether it is in batch mode, if so, we do some optimization.
+ *
+ * @param isStreamingMode whether in streaming mode
+ */
+ void isStreamingMode(boolean isStreamingMode);
+
/**
* Close the writer.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 86abcc947..1cb311633 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.NoopCompactManager;
@@ -174,8 +175,9 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
restoreIncrement);
}
- private boolean bufferSpillable() {
- return options.writeBufferSpillable(fileIO.isObjectStore());
+ @VisibleForTesting
+ public boolean bufferSpillable() {
+ return options.writeBufferSpillable(fileIO.isObjectStore(),
isStreamingMode);
}
private CompactManager createCompactManager(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
index 69eb61f97..d0231a224 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
@@ -22,4 +22,7 @@ package org.apache.paimon.table.sink;
public interface InnerTableWrite extends StreamTableWrite, BatchTableWrite {
InnerTableWrite withIgnorePreviousFiles(boolean ignorePreviousFiles);
+
+ // we detect whether in streaming mode, and do some optimization
+ InnerTableWrite isStreamingMode(boolean isStreamingMode);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 1ee7d5640..0cfba0560 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -62,6 +62,12 @@ public class TableWriteImpl<T>
return this;
}
+ @Override
+ public TableWriteImpl<T> isStreamingMode(boolean isStreamingMode) {
+ write.isStreamingMode(isStreamingMode);
+ return this;
+ }
+
@Override
public TableWriteImpl<T> withIOManager(IOManager ioManager) {
write.withIOManager(ioManager);
@@ -167,6 +173,11 @@ public class TableWriteImpl<T>
write.restore(state);
}
+ @VisibleForTesting
+ public AbstractFileStoreWrite<T> getWrite() {
+ return write;
+ }
+
/** Extractor to extract {@link T} from the {@link SinkRecord}. */
public interface RecordExtractor<T> {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index b5e168df3..b4389717c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -36,8 +36,8 @@ public class CompactorSink extends FlinkSink<RowData> {
@Override
protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
- StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
- return new StoreCompactOperator(table, writeProvider, isStreaming,
commitUser);
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
+ return new StoreCompactOperator(table, writeProvider, commitUser);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
index 6ef487df7..6b40815ed 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FileStoreSink.java
@@ -44,7 +44,7 @@ public class FileStoreSink extends FlinkWriteSink<RowData> {
@Override
protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
- StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
return new RowDataStoreWriteOperator(table, logSinkFunction,
writeProvider, commitUser);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 1f402ca8f..292661f1d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -66,7 +66,8 @@ public abstract class FlinkSink<T> implements Serializable {
this.ignorePreviousFiles = ignorePreviousFiles;
}
- private StoreSinkWrite.Provider createWriteProvider(CheckpointConfig
checkpointConfig) {
+ private StoreSinkWrite.Provider createWriteProvider(
+ CheckpointConfig checkpointConfig, boolean isStreaming) {
boolean waitCompaction;
if (table.coreOptions().writeOnly()) {
waitCompaction = false;
@@ -100,6 +101,7 @@ public abstract class FlinkSink<T> implements Serializable {
ignorePreviousFiles,
waitCompaction,
finalDeltaCommits,
+ isStreaming,
memoryPool);
}
}
@@ -112,6 +114,7 @@ public abstract class FlinkSink<T> implements Serializable {
ioManager,
ignorePreviousFiles,
waitCompaction,
+ isStreaming,
memoryPool);
}
@@ -148,8 +151,8 @@ public abstract class FlinkSink<T> implements Serializable {
createWriteOperator(
createWriteProvider(
input.getExecutionEnvironment()
-
.getCheckpointConfig()),
- isStreaming,
+ .getCheckpointConfig(),
+ isStreaming),
commitUser))
.setParallelism(parallelism == null ?
input.getParallelism() : parallelism);
Options options = Options.fromMap(table.options());
@@ -202,7 +205,7 @@ public abstract class FlinkSink<T> implements Serializable {
}
protected abstract OneInputStreamOperator<T, Committable>
createWriteOperator(
- StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser);
+ StoreSinkWrite.Provider writeProvider, String commitUser);
protected abstract SerializableFunction<String, Committer<Committable,
ManifestCommittable>>
createCommitterFactory(boolean streamingCheckpointEnabled);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index fd568ae05..847153ae0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -72,8 +72,17 @@ public class GlobalFullCompactionSinkWrite extends
StoreSinkWriteImpl {
boolean ignorePreviousFiles,
boolean waitCompaction,
int deltaCommits,
+ boolean isStreaming,
@Nullable MemorySegmentPool memoryPool) {
- super(table, commitUser, state, ioManager, ignorePreviousFiles,
waitCompaction, memoryPool);
+ super(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreaming,
+ memoryPool);
this.deltaCommits = deltaCommits;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
index dc9bcb869..a7775b8f9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDynamicBucketSink.java
@@ -59,7 +59,7 @@ public class RowDynamicBucketSink extends
DynamicBucketSink<RowData> {
@Override
protected OneInputStreamOperator<Tuple2<RowData, Integer>, Committable>
createWriteOperator(
- StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
return new DynamicBucketRowWriteOperator(table, writeProvider,
commitUser);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 975c7af68..daa851286 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -47,7 +47,6 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
private final FileStoreTable table;
private final StoreSinkWrite.Provider storeSinkWriteProvider;
- private final boolean isStreaming;
private final String initialCommitUser;
private transient StoreSinkWriteState state;
@@ -57,7 +56,6 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
public StoreCompactOperator(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
- boolean isStreaming,
String initialCommitUser) {
super(Options.fromMap(table.options()));
Preconditions.checkArgument(
@@ -65,7 +63,6 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
CoreOptions.WRITE_ONLY.key() + " should not be true for
StoreCompactOperator.");
this.table = table;
this.storeSinkWriteProvider = storeSinkWriteProvider;
- this.isStreaming = isStreaming;
this.initialCommitUser = initialCommitUser;
}
@@ -115,7 +112,7 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
byte[] serializedFiles = record.getBinary(3);
List<DataFileMeta> files =
dataFileMetaSerializer.deserializeList(serializedFiles);
- if (isStreaming) {
+ if (write.streamingMode()) {
write.notifyNewFiles(snapshotId, partition, bucket, files);
write.compact(partition, bucket, false);
} else {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index e183ed154..3f44ec609 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -49,6 +49,8 @@ public interface StoreSinkWrite {
void snapshotState() throws Exception;
+ boolean streamingMode();
+
void close() throws Exception;
/**
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 3b376d707..66a556c88 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -50,6 +50,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
private final IOManager ioManager;
private final boolean ignorePreviousFiles;
private final boolean waitCompaction;
+ private final boolean isStreamingMode;
@Nullable private final MemorySegmentPool memoryPool;
protected TableWriteImpl<?> write;
@@ -61,12 +62,14 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
IOManager ioManager,
boolean ignorePreviousFiles,
boolean waitCompaction,
+ boolean isStreamingMode,
@Nullable MemorySegmentPool memoryPool) {
this.commitUser = commitUser;
this.state = state;
this.ioManager = ioManager;
this.ignorePreviousFiles = ignorePreviousFiles;
this.waitCompaction = waitCompaction;
+ this.isStreamingMode = isStreamingMode;
this.memoryPool = memoryPool;
this.write = newTableWrite(table);
}
@@ -83,7 +86,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
: new HeapMemorySegmentPool(
table.coreOptions().writeBufferSize(),
table.coreOptions().pageSize()))
- .withIgnorePreviousFiles(ignorePreviousFiles);
+ .withIgnorePreviousFiles(ignorePreviousFiles)
+ .isStreamingMode(isStreamingMode);
}
@Override
@@ -138,6 +142,11 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
// do nothing
}
+ @Override
+ public boolean streamingMode() {
+ return isStreamingMode;
+ }
+
@Override
public void close() throws Exception {
if (write != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index b7ed60f68..914576195 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -18,10 +18,12 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.sink.StoreSinkWriteState.StateValueFilter;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -70,15 +72,26 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
: ChannelComputer.select(partition,
bucket, numTasks);
return task == getRuntimeContext().getIndexOfThisSubtask();
};
+
+ initStateAndWriter(
+ context,
+ stateFilter,
+ getContainingTask().getEnvironment().getIOManager(),
+ commitUser);
+ }
+
+ @VisibleForTesting
+ void initStateAndWriter(
+ StateInitializationContext context,
+ StateValueFilter stateFilter,
+ IOManager ioManager,
+ String commitUser)
+ throws Exception {
+ // We put state and write init in this method for convenient testing.
Without construct a
+ // runtime context, we can test to construct a writer here
state = new StoreSinkWriteState(context, stateFilter);
- write =
- storeSinkWriteProvider.provide(
- table,
- commitUser,
- state,
- getContainingTask().getEnvironment().getIOManager(),
- memoryPool);
+ write = storeSinkWriteProvider.provide(table, commitUser, state,
ioManager, memoryPool);
}
protected abstract boolean containLogSystem();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
index 1aec4e711..3c96fb03a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
@@ -44,7 +44,7 @@ public class UnawareBucketCompactionSink extends
FlinkSink<AppendOnlyCompactionT
@Override
protected OneInputStreamOperator<AppendOnlyCompactionTask, Committable>
createWriteOperator(
- StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
return new AppendOnlyTableCompactionWorkerOperator(table, commitUser);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
index 0b2820709..ecde6094c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketSink.java
@@ -73,7 +73,7 @@ public class CdcDynamicBucketSink extends
DynamicBucketSink<CdcRecord> {
@Override
protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable>
createWriteOperator(
- StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcDynamicBucketWriteOperator(table, writeProvider,
commitUser);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
index 9f603fe97..870bf5361 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSink.java
@@ -39,7 +39,7 @@ public class FlinkCdcSink extends FlinkWriteSink<CdcRecord> {
@Override
protected OneInputStreamOperator<CdcRecord, Committable>
createWriteOperator(
- StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
+ StoreSinkWrite.Provider writeProvider, String commitUser) {
return new CdcRecordStoreWriteOperator(table, writeProvider,
commitUser);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
new file mode 100644
index 000000000..cf954b340
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.operation.KeyValueFileStoreWrite;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.state.StateInitializationContextImpl;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+
+/** Test class for {@link FlinkSink}. */
+public class FlinkSinkTest {
+
+ @TempDir Path tempPath;
+
+ @Test
+ public void testOptimizeKeyValueWriterForBatch() throws Exception {
+ // test for batch mode auto enable spillable
+ FileStoreTable fileStoreTable = createFileStoreTable();
+ StreamExecutionEnvironment streamExecutionEnvironment =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ // set this when batch executing
+ streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ Assertions.assertTrue(testSpillable(streamExecutionEnvironment,
fileStoreTable));
+
+ // set this to streaming, we should get a false then
+
streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ Assertions.assertFalse(testSpillable(streamExecutionEnvironment,
fileStoreTable));
+ }
+
+ private boolean testSpillable(
+ StreamExecutionEnvironment streamExecutionEnvironment,
FileStoreTable fileStoreTable)
+ throws Exception {
+ DataStreamSource<RowData> source =
+ streamExecutionEnvironment.fromCollection(
+ Collections.singletonList(new
FlinkRowData(GenericRow.of(1, 1))));
+ FlinkSink<RowData> flinkSink =
+ new FileStoreSink(fileStoreTable, Lock.emptyFactory(), null,
null);
+ SingleOutputStreamOperator<Committable> written =
flinkSink.doWrite(source, "123", 1);
+ RowDataStoreWriteOperator operator =
+ ((RowDataStoreWriteOperator)
+ ((SimpleOperatorFactory)
+ ((OneInputTransformation)
written.getTransformation())
+ .getOperatorFactory())
+ .getOperator());
+ StateInitializationContextImpl context =
+ new StateInitializationContextImpl(
+ null,
+ new MockOperatorStateStore() {
+ @Override
+ public <S> ListState<S> getUnionListState(
+ ListStateDescriptor<S> stateDescriptor)
throws Exception {
+ return getListState(stateDescriptor);
+ }
+ },
+ null,
+ null,
+ null);
+ operator.initStateAndWriter(context, (a, b, c) -> true, new
IOManagerAsync(), "123");
+ return ((KeyValueFileStoreWrite) ((StoreSinkWriteImpl)
operator.write).write.getWrite())
+ .bufferSpillable();
+ }
+
+ protected static final RowType ROW_TYPE =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"pk", "pt0"});
+
+ private FileStoreTable createFileStoreTable() throws Exception {
+ org.apache.paimon.fs.Path tablePath = new
org.apache.paimon.fs.Path(tempPath.toString());
+ Options conf = new Options();
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ ROW_TYPE.getFields(),
+ Collections.emptyList(),
+ Arrays.asList("pk"),
+ conf.toMap(),
+ ""));
+ return FileStoreTableFactory.create(
+ FileIOFinder.find(tablePath), tablePath, tableSchema, conf);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 37aa35a15..a01ab7fe3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -647,7 +647,14 @@ public class CdcRecordStoreMultiWriteOperatorTest {
catalogLoader,
(t, commitUser, state, ioManager, memoryPool) ->
new StoreSinkWriteImpl(
- t, commitUser, state, ioManager,
false, false, memoryPool),
+ t,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ false,
+ true,
+ memoryPool),
commitUser,
Options.fromMap(new HashMap<>()));
TypeSerializer<CdcMultiplexRecord> inputSerializer = new
JavaSerializer<>();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index f7f865a15..0f2161f58 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -257,7 +257,14 @@ public class CdcRecordStoreWriteOperatorTest {
table,
(t, commitUser, state, ioManager, memoryPool) ->
new StoreSinkWriteImpl(
- t, commitUser, state, ioManager,
false, false, memoryPool),
+ t,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ false,
+ true,
+ memoryPool),
commitUser);
TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>();
TypeSerializer<Committable> outputSerializer =