This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b62923dcc [core] support custom commit.user-prefix (#3474)
b62923dcc is described below

commit b62923dcc273fd4111466f8d6136ac647b3930b0
Author: wangwj <[email protected]>
AuthorDate: Fri Jun 7 10:58:51 2024 +0800

    [core] support custom commit.user-prefix (#3474)
---
 .../shortcodes/generated/core_configuration.html      |  8 +++++++-
 .../src/main/java/org/apache/paimon/CoreOptions.java  | 16 +++++++++++++++-
 .../org/apache/paimon/catalog/AbstractCatalog.java    |  8 ++++++--
 .../paimon/table/sink/BatchWriteBuilderImpl.java      |  7 +++++--
 .../paimon/table/sink/StreamWriteBuilderImpl.java     |  6 ++++--
 .../paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java |  9 +++++----
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java     |  5 ++++-
 .../flink/sink/cdc/FlinkCdcMultiTableSinkTest.java    |  4 +++-
 .../paimon/flink/action/DropPartitionAction.java      |  9 +++++++--
 .../flink/procedure/DropPartitionProcedure.java       |  9 ++++++---
 .../paimon/flink/sink/CombinedTableCompactorSink.java |  7 +++----
 .../paimon/flink/sink/DynamicBucketCompactSink.java   |  5 +++--
 .../apache/paimon/flink/sink/DynamicBucketSink.java   |  4 ++--
 .../java/org/apache/paimon/flink/sink/FlinkSink.java  |  5 ++---
 .../sink/SupportsRowLevelOperationFlinkTableSink.java |  8 ++++++--
 .../flink/sink/index/GlobalDynamicBucketSink.java     |  6 ++----
 .../java/org/apache/paimon/flink/FileStoreITCase.java | 19 +++++++++++++++++++
 .../paimon/spark/procedure/CompactProcedure.java      |  4 ++--
 18 files changed, 101 insertions(+), 38 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index fe91f2a0f..beb8a7dc9 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -110,6 +110,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to force create snapshot on commit.</td>
         </tr>
+        <tr>
+            <td><h5>commit.user-prefix</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Specifies the commit user prefix.</td>
+        </tr>
         <tr>
             <td><h5>compaction.max-size-amplification-percent</h5></td>
             <td style="word-wrap: break-word;">200</td>
@@ -228,7 +234,7 @@ under the License.
             <td><h5>fields.default-aggregate-function</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
-            <td>Default aggregate function of all fields for partial-update 
and aggregate merge function</td>
+            <td>Default aggregate function of all fields for partial-update 
and aggregate merge function.</td>
         </tr>
         <tr>
             <td><h5>file-index.in-manifest-threshold</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 494e36092..5faee52b3 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -51,6 +51,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.options.ConfigOptions.key;
@@ -1183,7 +1184,13 @@ public class CoreOptions implements Serializable {
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Default aggregate function of all fields for 
partial-update and aggregate merge function");
+                            "Default aggregate function of all fields for 
partial-update and aggregate merge function.");
+
+    public static final ConfigOption<String> COMMIT_USER_PREFIX =
+            key("commit.user-prefix")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Specifies the commit user prefix.");
 
     private final Options options;
 
@@ -1302,6 +1309,13 @@ public class CoreOptions implements Serializable {
         return options.get(FIELDS_DEFAULT_AGG_FUNC);
     }
 
+    public static String createCommitUser(Options options) {
+        String commitUserPrefix = options.get(COMMIT_USER_PREFIX);
+        return commitUserPrefix == null
+                ? UUID.randomUUID().toString()
+                : commitUserPrefix + "_" + UUID.randomUUID();
+    }
+
     public boolean definedAggFunc() {
         if (options.contains(FIELDS_DEFAULT_AGG_FUNC)) {
             return true;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index f93198fb8..3001faf3b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -50,9 +50,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
@@ -170,7 +170,11 @@ public abstract class AbstractCatalog implements Catalog {
             throws TableNotExistException {
         Table table = getTable(identifier);
         FileStoreTable fileStoreTable = (FileStoreTable) table;
-        FileStoreCommit commit = 
fileStoreTable.store().newCommit(UUID.randomUUID().toString());
+        FileStoreCommit commit =
+                fileStoreTable
+                        .store()
+                        .newCommit(
+                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
         commit.dropPartitions(
                 Collections.singletonList(partitionSpec), 
BatchWriteBuilder.COMMIT_IDENTIFIER);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 87693ba0a..157de5575 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -18,13 +18,15 @@
 
 package org.apache.paimon.table.sink;
 
+import org.apache.paimon.options.Options;
 import org.apache.paimon.table.InnerTable;
 import org.apache.paimon.types.RowType;
 
 import javax.annotation.Nullable;
 
 import java.util.Map;
-import java.util.UUID;
+
+import static org.apache.paimon.CoreOptions.createCommitUser;
 
 /** Implementation for {@link WriteBuilder}. */
 public class BatchWriteBuilderImpl implements BatchWriteBuilder {
@@ -32,12 +34,13 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
     private static final long serialVersionUID = 1L;
 
     private final InnerTable table;
-    private final String commitUser = UUID.randomUUID().toString();
+    private final String commitUser;
 
     private Map<String, String> staticPartition;
 
     public BatchWriteBuilderImpl(InnerTable table) {
         this.table = table;
+        this.commitUser = createCommitUser(new Options(table.options()));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java
index 402b0beb2..e025d6581 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/StreamWriteBuilderImpl.java
@@ -18,10 +18,11 @@
 
 package org.apache.paimon.table.sink;
 
+import org.apache.paimon.options.Options;
 import org.apache.paimon.table.InnerTable;
 import org.apache.paimon.types.RowType;
 
-import java.util.UUID;
+import static org.apache.paimon.CoreOptions.createCommitUser;
 
 /** Implementation for {@link WriteBuilder}. */
 public class StreamWriteBuilderImpl implements StreamWriteBuilder {
@@ -30,10 +31,11 @@ public class StreamWriteBuilderImpl implements 
StreamWriteBuilder {
 
     private final InnerTable table;
 
-    private String commitUser = UUID.randomUUID().toString();
+    private String commitUser;
 
     public StreamWriteBuilderImpl(InnerTable table) {
         this.table = table;
+        this.commitUser = createCommitUser(new Options(table.options()));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index aa833ef00..51cd3e220 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -46,7 +46,6 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
-import java.util.UUID;
 
 import static 
org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration;
 import static org.apache.paimon.flink.sink.FlinkSink.configureGlobalCommitter;
@@ -65,16 +64,19 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
     private final double commitCpuCores;
     @Nullable private final MemorySize commitHeapMemory;
     private final boolean commitChaining;
+    private final String commitUser;
 
     public FlinkCdcMultiTableSink(
             Catalog.Loader catalogLoader,
             double commitCpuCores,
             @Nullable MemorySize commitHeapMemory,
-            boolean commitChaining) {
+            boolean commitChaining,
+            String commitUser) {
         this.catalogLoader = catalogLoader;
         this.commitCpuCores = commitCpuCores;
         this.commitHeapMemory = commitHeapMemory;
         this.commitChaining = commitChaining;
+        this.commitUser = commitUser;
     }
 
     private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
@@ -99,8 +101,7 @@ public class FlinkCdcMultiTableSink implements Serializable {
         // commit operators.
         // When the job restarts, commitUser will be recovered from states and 
this value is
         // ignored.
-        String initialCommitUser = UUID.randomUUID().toString();
-        return sinkFrom(input, initialCommitUser, createWriteProvider());
+        return sinkFrom(input, commitUser, createWriteProvider());
     }
 
     public DataStreamSink<?> sinkFrom(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index 298e06ba3..be13e7f67 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -40,6 +40,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
 
@@ -76,6 +77,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
     // database to sync, currently only support single database
     private String database;
     private MultiTablesSinkMode mode;
+    private String commitUser;
 
     public FlinkCdcSyncDatabaseSinkBuilder<T> withInput(DataStream<T> input) {
         this.input = input;
@@ -102,6 +104,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
         this.committerCpu = 
options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU);
         this.committerMemory = 
options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY);
         this.commitChaining = 
options.get(FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING);
+        this.commitUser = createCommitUser(options);
         return this;
     }
 
@@ -163,7 +166,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
 
         FlinkCdcMultiTableSink sink =
                 new FlinkCdcMultiTableSink(
-                        catalogLoader, committerCpu, committerMemory, 
commitChaining);
+                        catalogLoader, committerCpu, committerMemory, 
commitChaining, commitUser);
         sink.sinkFrom(partitioned);
     }
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
index ab4ac26bc..fd23e500d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.junit.jupiter.api.Test;
 
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -59,7 +60,8 @@ public class FlinkCdcMultiTableSinkTest {
                         () -> FlinkCatalogFactory.createPaimonCatalog(new 
Options()),
                         
FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(),
                         null,
-                        true);
+                        true,
+                        UUID.randomUUID().toString());
         DataStreamSink<?> dataStreamSink = sink.sinkFrom(input);
 
         // check the transformation graph
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
index d6a70f375..2ead85001 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
@@ -24,7 +24,8 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
 
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
+
+import static org.apache.paimon.CoreOptions.createCommitUser;
 
 /** Table drop partition action for Flink. */
 public class DropPartitionAction extends TableActionBase {
@@ -49,7 +50,11 @@ public class DropPartitionAction extends TableActionBase {
         this.partitions = partitions;
 
         FileStoreTable fileStoreTable = (FileStoreTable) table;
-        this.commit = 
fileStoreTable.store().newCommit(UUID.randomUUID().toString());
+        this.commit =
+                fileStoreTable
+                        .store()
+                        .newCommit(
+                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
index 4903d6838..76c1ea096 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -27,8 +27,7 @@ import org.apache.paimon.utils.ParameterUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 
-import java.util.UUID;
-
+import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -53,7 +52,11 @@ public class DropPartitionProcedure extends ProcedureBase {
 
         FileStoreTable fileStoreTable =
                 (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
-        FileStoreCommit commit = 
fileStoreTable.store().newCommit(UUID.randomUUID().toString());
+        FileStoreCommit commit =
+                fileStoreTable
+                        .store()
+                        .newCommit(
+                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
         commit.dropPartitions(
                 ParameterUtils.getPartitions(partitionStrings),
                 BatchWriteBuilder.COMMIT_IDENTIFIER);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
index d6128c249..df5396318 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java
@@ -38,8 +38,8 @@ import org.apache.flink.table.data.RowData;
 
 import java.io.Serializable;
 import java.util.Map;
-import java.util.UUID;
 
+import static org.apache.paimon.CoreOptions.createCommitUser;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY;
@@ -57,7 +57,6 @@ public class CombinedTableCompactorSink implements 
Serializable {
 
     private final Catalog.Loader catalogLoader;
     private final boolean ignorePreviousFiles;
-
     private final Options options;
 
     public CombinedTableCompactorSink(Catalog.Loader catalogLoader, Options 
options) {
@@ -74,8 +73,8 @@ public class CombinedTableCompactorSink implements 
Serializable {
         // commit operators.
         // When the job restarts, commitUser will be recovered from states and 
this value is
         // ignored.
-        String initialCommitUser = UUID.randomUUID().toString();
-        return sinkFrom(awareBucketTableSource, unawareBucketTableSource, 
initialCommitUser);
+        return sinkFrom(
+                awareBucketTableSource, unawareBucketTableSource, 
createCommitUser(options));
     }
 
     public DataStreamSink<?> sinkFrom(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
index 009dce0ed..147e7527f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketCompactSink.java
@@ -30,7 +30,8 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 import javax.annotation.Nullable;
 
 import java.util.Map;
-import java.util.UUID;
+
+import static org.apache.paimon.CoreOptions.createCommitUser;
 
 /** This class is only used for generate compact sink topology for dynamic 
bucket table. */
 public class DynamicBucketCompactSink extends RowDynamicBucketSink {
@@ -42,7 +43,7 @@ public class DynamicBucketCompactSink extends 
RowDynamicBucketSink {
 
     @Override
     public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable 
Integer parallelism) {
-        String initialCommitUser = UUID.randomUUID().toString();
+        String initialCommitUser = 
createCommitUser(table.coreOptions().toConfiguration());
 
         // This input is sorted and compacted. So there is no shuffle here, we 
just assign bucket
         // for each record, and sink them to table.
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
index a4880877d..f04043ce4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DynamicBucketSink.java
@@ -33,8 +33,8 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 import javax.annotation.Nullable;
 
 import java.util.Map;
-import java.util.UUID;
 
+import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
 
 /** Sink for dynamic bucket table. */
@@ -55,7 +55,7 @@ public abstract class DynamicBucketSink<T> extends 
FlinkWriteSink<Tuple2<T, Inte
             extractorFunction();
 
     public DataStreamSink<?> build(DataStream<T> input, @Nullable Integer 
parallelism) {
-        String initialCommitUser = UUID.randomUUID().toString();
+        String initialCommitUser = 
createCommitUser(table.coreOptions().toConfiguration());
 
         // Topology:
         // input -- shuffle by key hash --> bucket-assigner -- shuffle by 
partition & bucket -->
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 2473b9a93..131f1fbf4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -50,9 +50,9 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Queue;
 import java.util.Set;
-import java.util.UUID;
 
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
+import static org.apache.paimon.CoreOptions.createCommitUser;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
@@ -154,8 +154,7 @@ public abstract class FlinkSink<T> implements Serializable {
         // commit operators.
         // When the job restarts, commitUser will be recovered from states and 
this value is
         // ignored.
-        String initialCommitUser = UUID.randomUUID().toString();
-        return sinkFrom(input, initialCommitUser);
+        return sinkFrom(input, 
createCommitUser(table.coreOptions().toConfiguration()));
     }
 
     public DataStreamSink<?> sinkFrom(DataStream<T> input, String 
initialCommitUser) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 2870abcbc..e809fc22d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -53,12 +53,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
+import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Flink table sink that supports row level update and delete. */
@@ -160,8 +160,12 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
 
     @Override
     public Optional<Long> executeDeletion() {
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
         FileStoreCommit commit =
-                ((FileStoreTable) 
table).store().newCommit(UUID.randomUUID().toString());
+                fileStoreTable
+                        .store()
+                        .newCommit(
+                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
         long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
         if (deletePredicate == null) {
             commit.truncateTable(identifier);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
index ccab48f0a..3ce562d89 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -44,8 +44,8 @@ import javax.annotation.Nullable;
 
 import java.util.List;
 import java.util.Map;
-import java.util.UUID;
 
+import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.crosspartition.IndexBootstrap.bootstrapType;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_CROSS_PARTITION_MANAGED_MEMORY;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
@@ -68,8 +68,6 @@ public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<InternalRow,
     }
 
     public DataStreamSink<?> build(DataStream<InternalRow> input, @Nullable 
Integer parallelism) {
-        String initialCommitUser = UUID.randomUUID().toString();
-
         TableSchema schema = table.schema();
         CoreOptions options = table.coreOptions();
         RowType rowType = schema.logicalRowType();
@@ -128,6 +126,6 @@ public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<InternalRow,
                 partition(bucketAssigned, new 
RowWithBucketChannelComputer(schema), parallelism);
 
         // 4. writer and committer
-        return sinkFrom(partitionByBucket, initialCommitUser);
+        return sinkFrom(partitionByBucket, 
createCommitUser(options.toConfiguration()));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
index 225c7f63f..6a2c7b071 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileStoreITCase.java
@@ -63,6 +63,7 @@ import 
org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -388,6 +389,24 @@ public class FileStoreITCase extends AbstractTestBase {
                 .isEqualTo(Boundedness.BOUNDED);
     }
 
+    @TestTemplate
+    public void testCommitUserWithPrefix() throws Exception {
+        String commitUserPrefix = "commitUserPrefix";
+
+        FileStoreTable table = buildFileStoreTable(new int[0], new int[] {2});
+        table =
+                table.copy(
+                        Collections.singletonMap(
+                                CoreOptions.COMMIT_USER_PREFIX.key(), 
commitUserPrefix));
+        // write
+        new FlinkSinkBuilder(table).forRowData(buildTestSource(env, 
isBatch)).build();
+        env.execute();
+
+        Assertions.assertNotNull(table.snapshotManager().latestSnapshot());
+        Assertions.assertTrue(
+                
table.snapshotManager().latestSnapshot().commitUser().startsWith(commitUserPrefix));
+    }
+
     private void innerTestContinuous(FileStoreTable table) throws Exception {
         assumeFalse(isBatch);
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index f6007fa32..0f64f95c2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -74,9 +74,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
@@ -310,7 +310,7 @@ public class CompactProcedure extends BaseProcedure {
             throw new RuntimeException("serialize compaction task failed");
         }
 
-        String commitUser = UUID.randomUUID().toString();
+        String commitUser = 
createCommitUser(table.coreOptions().toConfiguration());
         JavaRDD<byte[]> commitMessageJavaRDD =
                 javaSparkContext
                         .parallelize(serializedTasks)

Reply via email to