This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b62923dcc [core] support custom commit.user-prefix (#3474)
b62923dcc is described below
commit b62923dcc273fd4111466f8d6136ac647b3930b0
Author: wangwj <[email protected]>
AuthorDate: Fri Jun 7 10:58:51 2024 +0800
[core] support custom commit.user-prefix (#3474)
---
.../shortcodes/generated/core_configuration.html | 8 +++++++-
.../src/main/java/org/apache/paimon/CoreOptions.java | 16 +++++++++++++++-
.../org/apache/paimon/catalog/AbstractCatalog.java | 8 ++++++--
.../paimon/table/sink/BatchWriteBuilderImpl.java | 7 +++++--
.../paimon/table/sink/StreamWriteBuilderImpl.java | 6 ++++--
.../paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java | 9 +++++----
.../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 5 ++++-
.../flink/sink/cdc/FlinkCdcMultiTableSinkTest.java | 4 +++-
.../paimon/flink/action/DropPartitionAction.java | 9 +++++++--
.../flink/procedure/DropPartitionProcedure.java | 9 ++++++---
.../paimon/flink/sink/CombinedTableCompactorSink.java | 7 +++----
.../paimon/flink/sink/DynamicBucketCompactSink.java | 5 +++--
.../apache/paimon/flink/sink/DynamicBucketSink.java | 4 ++--
.../java/org/apache/paimon/flink/sink/FlinkSink.java | 5 ++---
.../sink/SupportsRowLevelOperationFlinkTableSink.java | 8 ++++++--
.../flink/sink/index/GlobalDynamicBucketSink.java | 6 ++----
.../java/org/apache/paimon/flink/FileStoreITCase.java | 19 +++++++++++++++++++
.../paimon/spark/procedure/CompactProcedure.java | 4 ++--
18 files changed, 101 insertions(+), 38 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index fe91f2a0f..beb8a7dc9 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -110,6 +110,12 @@ under the License.
<td>Boolean</td>
<td>Whether to force create snapshot on commit.</td>
</tr>
+ <tr>
+ <td><h5>commit.user-prefix</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specifies the commit user prefix.</td>
+ </tr>
<tr>
<td><h5>compaction.max-size-amplification-percent</h5></td>
<td style="word-wrap: break-word;">200</td>
@@ -228,7 +234,7 @@ under the License.
<td><h5>fields.default-aggregate-function</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Default aggregate function of all fields for partial-update
and aggregate merge function</td>
+ <td>Default aggregate function of all fields for partial-update
and aggregate merge function.</td>
</tr>
<tr>
<td><h5>file-index.in-manifest-threshold</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 494e36092..5faee52b3 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -51,6 +51,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.paimon.options.ConfigOptions.key;
@@ -1183,7 +1184,13 @@ public class CoreOptions implements Serializable {
.stringType()
.noDefaultValue()
.withDescription(
- "Default aggregate function of all fields for
partial-update and aggregate merge function");
+ "Default aggregate function of all fields for
partial-update and aggregate merge function.");
+
+ public static final ConfigOption<String> COMMIT_USER_PREFIX =
+ key("commit.user-prefix")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Specifies the commit user prefix.");
private final Options options;
@@ -1302,6 +1309,13 @@ public class CoreOptions implements Serializable {
return options.get(FIELDS_DEFAULT_AGG_FUNC);
}
+ public static String createCommitUser(Options options) {
+ String commitUserPrefix = options.get(COMMIT_USER_PREFIX);
+ return commitUserPrefix == null
+ ? UUID.randomUUID().toString()
+ : commitUserPrefix + "_" + UUID.randomUUID();
+ }
+
public boolean definedAggFunc() {
if (options.contains(FIELDS_DEFAULT_AGG_FUNC)) {
return true;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index f93198fb8..3001faf3b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -50,9 +50,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.UUID;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
@@ -170,7 +170,11 @@ public abstract class AbstractCatalog implements Catalog {
throws TableNotExistException {
Table table = getTable(identifier);
FileStoreTable fileStoreTable = (FileStoreTable) table;
- FileStoreCommit commit =
fileStoreTable.store().newCommit(UUID.randomUUID().toString());
+ FileStoreCommit commit =
+ fileStoreTable
+ .store()
+ .newCommit(
+
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
commit.dropPartitions(
Collections.singletonList(partitionSpec),
BatchWriteBuilder.COMMIT_IDENTIFIER);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 87693ba0a..157de5575 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -18,13 +18,15 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
import javax.annotation.Nullable;
import java.util.Map;
-import java.util.UUID;
+
+import static org.apache.paimon.CoreOptions.createCommitUser;
/** Implementation for {@link WriteBuilder}. */
public class BatchWriteBuilderImpl implements BatchWriteBuilder {
@@ -32,12 +34,13 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
private static final long serialVersionUID = 1L;
private final InnerTable table;
- private final String commitUser = UUID.randomUUID().toString();
+ private final String commitUser;
private Map<String, String> staticPartition;
public BatchWriteBuilderImpl(InnerTable table) {
this.table = table;
+ this.commitUser = createCommitUser(new Options(table.options()));
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java
index 402b0beb2..e025d6581 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java
@@ -18,10 +18,11 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.InnerTable;
import org.apache.paimon.types.RowType;
-import java.util.UUID;
+import static org.apache.paimon.CoreOptions.createCommitUser;
/** Implementation for {@link WriteBuilder}. */
public class StreamWriteBuilderImpl implements StreamWriteBuilder {
@@ -30,10 +31,11 @@ public class StreamWriteBuilderImpl implements
StreamWriteBuilder {
private final InnerTable table;
- private String commitUser = UUID.randomUUID().toString();
+ private String commitUser;
public StreamWriteBuilderImpl(InnerTable table) {
this.table = table;
+ this.commitUser = createCommitUser(new Options(table.options()));
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index aa833ef00..51cd3e220 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -46,7 +46,6 @@ import
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import javax.annotation.Nullable;
import java.io.Serializable;
-import java.util.UUID;
import static
org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter;
@@ -65,16 +64,19 @@ public class FlinkCdcMultiTableSink implements Serializable
{
private final double commitCpuCores;
@Nullable private final MemorySize commitHeapMemory;
private final boolean commitChaining;
+ private final String commitUser;
public FlinkCdcMultiTableSink(
Catalog.Loader catalogLoader,
double commitCpuCores,
@Nullable MemorySize commitHeapMemory,
- boolean commitChaining) {
+ boolean commitChaining,
+ String commitUser) {
this.catalogLoader = catalogLoader;
this.commitCpuCores = commitCpuCores;
this.commitHeapMemory = commitHeapMemory;
this.commitChaining = commitChaining;
+ this.commitUser = commitUser;
}
private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
@@ -99,8 +101,7 @@ public class FlinkCdcMultiTableSink implements Serializable {
// commit operators.
// When the job restarts, commitUser will be recovered from states and
this value is
// ignored.
- String initialCommitUser = UUID.randomUUID().toString();
- return sinkFrom(input, initialCommitUser, createWriteProvider());
+ return sinkFrom(input, commitUser, createWriteProvider());
}
public DataStreamSink<?> sinkFrom(
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 298e06ba3..be13e7f67 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
@@ -76,6 +77,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
// database to sync, currently only support single database
private String database;
private MultiTablesSinkMode mode;
+ private String commitUser;
public FlinkCdcSyncDatabaseSinkBuilder<T> withInput(DataStream<T> input) {
this.input = input;
@@ -102,6 +104,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
this.committerCpu =
options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
this.committerMemory =
options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
this.commitChaining =
options.get(FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING);
+ this.commitUser = createCommitUser(options);
return this;
}
@@ -163,7 +166,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
FlinkCdcMultiTableSink sink =
new FlinkCdcMultiTableSink(
- catalogLoader, committerCpu, committerMemory,
commitChaining);
+ catalogLoader, committerCpu, committerMemory,
commitChaining, commitUser);
sink.sinkFrom(partitioned);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
index ab4ac26bc..fd23e500d 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
@@ -31,6 +31,7 @@ import
org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.junit.jupiter.api.Test;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
@@ -59,7 +60,8 @@ public class FlinkCdcMultiTableSinkTest {
() -> FlinkCatalogFactory.createPaimonCatalog(new
Options()),
FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(),
null,
- true);
+ true,
+ UUID.randomUUID().toString());
DataStreamSink<?> dataStreamSink = sink.sinkFrom(input);
// check the transformation graph
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
index d6a70f375..2ead85001 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
@@ -24,7 +24,8 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
+
+import static org.apache.paimon.CoreOptions.createCommitUser;
/** Table drop partition action for Flink. */
public class DropPartitionAction extends TableActionBase {
@@ -49,7 +50,11 @@ public class DropPartitionAction extends TableActionBase {
this.partitions = partitions;
FileStoreTable fileStoreTable = (FileStoreTable) table;
- this.commit =
fileStoreTable.store().newCommit(UUID.randomUUID().toString());
+ this.commit =
+ fileStoreTable
+ .store()
+ .newCommit(
+
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
index 4903d6838..76c1ea096 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -27,8 +27,7 @@ import org.apache.paimon.utils.ParameterUtils;
import org.apache.flink.table.procedure.ProcedureContext;
-import java.util.UUID;
-
+import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -53,7 +52,11 @@ public class DropPartitionProcedure extends ProcedureBase {
FileStoreTable fileStoreTable =
(FileStoreTable)
catalog.getTable(Identifier.fromString(tableId));
- FileStoreCommit commit =
fileStoreTable.store().newCommit(UUID.randomUUID().toString());
+ FileStoreCommit commit =
+ fileStoreTable
+ .store()
+ .newCommit(
+
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
commit.dropPartitions(
ParameterUtils.getPartitions(partitionStrings),
BatchWriteBuilder.COMMIT_IDENTIFIER);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index d6128c249..df5396318 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -38,8 +38,8 @@ import org.apache.flink.table.data.RowData;
import java.io.Serializable;
import java.util.Map;
-import java.util.UUID;
+import static org.apache.paimon.CoreOptions.createCommitUser;
import static
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
@@ -57,7 +57,6 @@ public class CombinedTableCompactorSink implements
Serializable {
private final Catalog.Loader catalogLoader;
private final boolean ignorePreviousFiles;
-
private final Options options;
public CombinedTableCompactorSink(Catalog.Loader catalogLoader, Options
options) {
@@ -74,8 +73,8 @@ public class CombinedTableCompactorSink implements
Serializable {
// commit operators.
// When the job restarts, commitUser will be recovered from states and
this value is
// ignored.
- String initialCommitUser = UUID.randomUUID().toString();
- return sinkFrom(awareBucketTableSource, unawareBucketTableSource,
initialCommitUser);
+ return sinkFrom(
+ awareBucketTableSource, unawareBucketTableSource,
createCommitUser(options));
}
public DataStreamSink<?> sinkFrom(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
index 009dce0ed..147e7527f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
@@ -30,7 +30,8 @@ import
org.apache.flink.streaming.api.datastream.DataStreamSink;
import javax.annotation.Nullable;
import java.util.Map;
-import java.util.UUID;
+
+import static org.apache.paimon.CoreOptions.createCommitUser;
/** This class is only used for generate compact sink topology for dynamic
bucket table. */
public class DynamicBucketCompactSink extends RowDynamicBucketSink {
@@ -42,7 +43,7 @@ public class DynamicBucketCompactSink extends
RowDynamicBucketSink {
@Override
public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable
Integer parallelism) {
- String initialCommitUser = UUID.randomUUID().toString();
+ String initialCommitUser =
createCommitUser(table.coreOptions().toConfiguration());
// This input is sorted and compacted. So there is no shuffle here, we
just assign bucket
// for each record, and sink them to table.
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
index a4880877d..f04043ce4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
@@ -33,8 +33,8 @@ import
org.apache.flink.streaming.api.datastream.DataStreamSink;
import javax.annotation.Nullable;
import java.util.Map;
-import java.util.UUID;
+import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
/** Sink for dynamic bucket table. */
@@ -55,7 +55,7 @@ public abstract class DynamicBucketSink<T> extends
FlinkWriteSink<Tuple2<T, Inte
extractorFunction();
public DataStreamSink<?> build(DataStream<T> input, @Nullable Integer
parallelism) {
- String initialCommitUser = UUID.randomUUID().toString();
+ String initialCommitUser =
createCommitUser(table.coreOptions().toConfiguration());
// Topology:
// input -- shuffle by key hash --> bucket-assigner -- shuffle by
partition & bucket -->
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 2473b9a93..131f1fbf4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -50,9 +50,9 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
-import java.util.UUID;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
+import static org.apache.paimon.CoreOptions.createCommitUser;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
@@ -154,8 +154,7 @@ public abstract class FlinkSink<T> implements Serializable {
// commit operators.
// When the job restarts, commitUser will be recovered from states and
this value is
// ignored.
- String initialCommitUser = UUID.randomUUID().toString();
- return sinkFrom(input, initialCommitUser);
+ return sinkFrom(input,
createCommitUser(table.coreOptions().toConfiguration()));
}
public DataStreamSink<?> sinkFrom(DataStream<T> input, String
initialCommitUser) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 2870abcbc..e809fc22d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -53,12 +53,12 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
+import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Flink table sink that supports row level update and delete. */
@@ -160,8 +160,12 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
@Override
public Optional<Long> executeDeletion() {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStoreCommit commit =
- ((FileStoreTable)
table).store().newCommit(UUID.randomUUID().toString());
+ fileStoreTable
+ .store()
+ .newCommit(
+
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
if (deletePredicate == null) {
commit.truncateTable(identifier);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
index ccab48f0a..3ce562d89 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -44,8 +44,8 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
-import java.util.UUID;
+import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.crosspartition.IndexBootstrap.bootstrapType;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SINK_CROSS_PARTITION_MANAGED_MEMORY;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
@@ -68,8 +68,6 @@ public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<InternalRow,
}
public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable
Integer parallelism) {
- String initialCommitUser = UUID.randomUUID().toString();
-
TableSchema schema = table.schema();
CoreOptions options = table.coreOptions();
RowType rowType = schema.logicalRowType();
@@ -128,6 +126,6 @@ public class GlobalDynamicBucketSink extends
FlinkWriteSink<Tuple2<InternalRow,
partition(bucketAssigned, new
RowWithBucketChannelComputer(schema), parallelism);
// 4. writer and committer
- return sinkFrom(partitionByBucket, initialCommitUser);
+ return sinkFrom(partitionByBucket,
createCommitUser(options.toConfiguration()));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 225c7f63f..6a2c7b071 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -63,6 +63,7 @@ import
org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -388,6 +389,24 @@ public class FileStoreITCase extends AbstractTestBase {
.isEqualTo(Boundedness.BOUNDED);
}
+ @TestTemplate
+ public void testCommitUserWithPrefix() throws Exception {
+ String commitUserPrefix = "commitUserPrefix";
+
+ FileStoreTable table = buildFileStoreTable(new int[0], new int[] {2});
+ table =
+ table.copy(
+ Collections.singletonMap(
+ CoreOptions.COMMIT_USER_PREFIX.key(),
commitUserPrefix));
+ // write
+ new FlinkSinkBuilder(table).forRowData(buildTestSource(env,
isBatch)).build();
+ env.execute();
+
+ Assertions.assertNotNull(table.snapshotManager().latestSnapshot());
+ Assertions.assertTrue(
+
table.snapshotManager().latestSnapshot().commitUser().startsWith(commitUserPrefix));
+ }
+
private void innerTestContinuous(FileStoreTable table) throws Exception {
assumeFalse(isBatch);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index f6007fa32..0f64f95c2 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -74,9 +74,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.UUID;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -310,7 +310,7 @@ public class CompactProcedure extends BaseProcedure {
throw new RuntimeException("serialize compaction task failed");
}
- String commitUser = UUID.randomUUID().toString();
+ String commitUser =
createCommitUser(table.coreOptions().toConfiguration());
JavaRDD<byte[]> commitMessageJavaRDD =
javaSparkContext
.parallelize(serializedTasks)