This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b056bd0700 [core] Support customize action for partition mark done. 
(#4817)
b056bd0700 is described below

commit b056bd07003430c1a0da8d5b16ceea67b6f04b8d
Author: HunterXHunter <[email protected]>
AuthorDate: Mon Jan 6 20:43:56 2025 +0800

    [core] Support customize action for partition mark done. (#4817)
---
 docs/content/flink/sql-write.md                    |  18 ++++
 .../shortcodes/generated/core_configuration.html   |   8 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  17 +++-
 .../partition/actions/PartitionMarkDoneAction.java |  38 +++++++-
 .../procedure/MarkPartitionDoneProcedure.java      |   3 +-
 .../flink/action/MarkPartitionDoneAction.java      |   3 +-
 .../procedure/MarkPartitionDoneProcedure.java      |   3 +-
 .../flink/sink/partition/PartitionListeners.java   |   1 +
 .../flink/sink/partition/PartitionMarkDone.java    |   3 +-
 .../action/MarkPartitionDoneActionITCase.java      |  74 ++++++++++++++-
 .../CustomPartitionMarkDoneActionTest.java         | 104 +++++++++++++++++++++
 .../MockCustomPartitionMarkDoneAction.java         |  43 +++++++++
 .../sink/partition/PartitionMarkDoneTest.java      |  14 ++-
 .../procedure/MarkPartitionDoneProcedure.java      |   3 +-
 .../spark/commands/WriteIntoPaimonTable.scala      |   3 +-
 .../procedure/MarkPartitionDoneProcedureTest.scala |  38 ++++++++
 .../MockCustomPartitionMarkDoneAction.scala        |  45 +++++++++
 17 files changed, 399 insertions(+), 19 deletions(-)

diff --git a/docs/content/flink/sql-write.md b/docs/content/flink/sql-write.md
index 6abbfa0175..b5950c6699 100644
--- a/docs/content/flink/sql-write.md
+++ b/docs/content/flink/sql-write.md
@@ -261,8 +261,26 @@ CREATE TABLE my_partitioned_table (
     'partition.time-interval'='1 d',
     'partition.idle-time-to-done'='15 m',
     'partition.mark-done-action'='done-partition'
+    -- You can also customize a PartitionMarkDoneAction to mark the partition 
completed.
+    -- 'partition.mark-done-action'='done-partition,custom',
+    -- 
'partition.mark-done-action.custom.class'='org.apache.paimon.CustomPartitionMarkDoneAction'
 );
 ```
+Define a class CustomPartitionMarkDoneAction to implement the 
PartitionMarkDoneAction interface.
+```java
+package org.apache.paimon;
+
+public class CustomPartitionMarkDoneAction implements PartitionMarkDoneAction {
+    
+    @Override
+    public void markDone(String partition) {
+        // do something.
+    }
+
+    @Override
+    public void close() {}
+}
+```
 
 1. Firstly, you need to define the time parser of the partition and the time 
interval between partitions in order to
    determine when the partition can be properly marked done.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index bf83eb687b..5e9a8139dd 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -639,7 +639,13 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td><h5>partition.mark-done-action</h5></td>
             <td style="word-wrap: break-word;">"success-file"</td>
             <td>String</td>
-            <td>Action to mark a partition done is to notify the downstream 
application that the partition has finished writing, the partition is ready to 
be read.<br />1. 'success-file': add '_success' file to directory.<br />2. 
'done-partition': add 'xxx.done' partition to metastore.<br />3. 'mark-event': 
mark partition event to metastore.<br />Both can be configured at the same 
time: 'done-partition,success-file,mark-event'.</td>
+            <td>Action to mark a partition done is to notify the downstream 
application that the partition has finished writing, the partition is ready to 
be read.<br />1. 'success-file': add '_success' file to directory.<br />2. 
'done-partition': add 'xxx.done' partition to metastore.<br />3. 'mark-event': 
mark partition event to metastore.<br />4. 'custom': use policy class to create 
a mark-partition policy.<br />Both can be configured at the same time: 
'done-partition,success-file,mar [...]
+        </tr>
+        <tr>
+            <td><h5>partition.mark-done-action.custom.class</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The partition mark done class for implement 
PartitionMarkDoneAction interface. Only work in custom mark-done-action.</td>
         </tr>
         <tr>
             <td><h5>partition.timestamp-formatter</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 002def9f44..eca39855b5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1212,9 +1212,20 @@ public class CoreOptions implements Serializable {
                                     .text("3. 'mark-event': mark partition 
event to metastore.")
                                     .linebreak()
                                     .text(
-                                            "Both can be configured at the 
same time: 'done-partition,success-file,mark-event'.")
+                                            "4. 'custom': use policy class to 
create a mark-partition policy.")
+                                    .linebreak()
+                                    .text(
+                                            "Both can be configured at the 
same time: 'done-partition,success-file,mark-event,custom'.")
                                     .build());
 
+    public static final ConfigOption<String> PARTITION_MARK_DONE_CUSTOM_CLASS =
+            key("partition.mark-done-action.custom.class")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The partition mark done class for implement"
+                                    + " PartitionMarkDoneAction interface. 
Only work in custom mark-done-action.");
+
     public static final ConfigOption<Boolean> METASTORE_PARTITIONED_TABLE =
             key("metastore.partitioned-table")
                     .booleanType()
@@ -2230,6 +2241,10 @@ public class CoreOptions implements Serializable {
         return options.get(PARTITION_TIMESTAMP_PATTERN);
     }
 
+    public String partitionMarkDoneCustomClass() {
+        return options.get(PARTITION_MARK_DONE_CUSTOM_CLASS);
+    }
+
     public String consumerId() {
         String consumerId = options.get(CONSUMER_ID);
         if (consumerId != null && consumerId.isEmpty()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
index d6b13a25e2..4bdb49823d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.partition.actions;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.StringUtils;
 
 import java.io.Closeable;
 import java.util.Arrays;
@@ -29,29 +30,37 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
 import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Action to mark partitions done. */
 public interface PartitionMarkDoneAction extends Closeable {
 
+    String SUCCESS_FILE = "success-file";
+    String DONE_PARTITION = "done-partition";
+    String MARK_EVENT = "mark-event";
+    String CUSTOM = "custom";
+
     void markDone(String partition) throws Exception;
 
     static List<PartitionMarkDoneAction> createActions(
-            FileStoreTable fileStoreTable, CoreOptions options) {
+            ClassLoader cl, FileStoreTable fileStoreTable, CoreOptions 
options) {
         return 
Arrays.stream(options.toConfiguration().get(PARTITION_MARK_DONE_ACTION).split(","))
                 .map(
                         action -> {
-                            switch (action) {
-                                case "success-file":
+                            switch (action.toLowerCase()) {
+                                case SUCCESS_FILE:
                                     return new SuccessFileMarkDoneAction(
                                             fileStoreTable.fileIO(), 
fileStoreTable.location());
-                                case "done-partition":
+                                case DONE_PARTITION:
                                     return new AddDonePartitionAction(
                                             
createMetastoreClient(fileStoreTable, options));
-                                case "mark-event":
+                                case MARK_EVENT:
                                     return new MarkPartitionDoneEventAction(
                                             
createMetastoreClient(fileStoreTable, options));
+                                case CUSTOM:
+                                    return generateCustomMarkDoneAction(cl, 
options);
                                 default:
                                     throw new 
UnsupportedOperationException(action);
                             }
@@ -59,6 +68,25 @@ public interface PartitionMarkDoneAction extends Closeable {
                 .collect(Collectors.toList());
     }
 
+    static PartitionMarkDoneAction generateCustomMarkDoneAction(
+            ClassLoader cl, CoreOptions options) {
+        if 
(StringUtils.isNullOrWhitespaceOnly(options.partitionMarkDoneCustomClass())) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "You need to set [%s] when you add [%s] mark done 
action in your property [%s].",
+                            PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
+                            CUSTOM,
+                            PARTITION_MARK_DONE_ACTION.key()));
+        }
+        String customClass = options.partitionMarkDoneCustomClass();
+        try {
+            return (PartitionMarkDoneAction) 
cl.loadClass(customClass).newInstance();
+        } catch (ClassNotFoundException | IllegalAccessException | 
InstantiationException e) {
+            throw new RuntimeException(
+                    "Can not create new instance for custom class from " + 
customClass, e);
+        }
+    }
+
     static MetastoreClient createMetastoreClient(FileStoreTable table, 
CoreOptions options) {
         MetastoreClient.Factory metastoreClientFactory =
                 table.catalogEnvironment().metastoreClientFactory();
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index d70cccf6ba..22abfb3f3b 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -64,7 +64,8 @@ public class MarkPartitionDoneProcedure extends ProcedureBase 
{
         FileStoreTable fileStoreTable = (FileStoreTable) table;
         CoreOptions coreOptions = fileStoreTable.coreOptions();
         List<PartitionMarkDoneAction> actions =
-                PartitionMarkDoneAction.createActions(fileStoreTable, 
coreOptions);
+                PartitionMarkDoneAction.createActions(
+                        getClass().getClassLoader(), fileStoreTable, 
coreOptions);
 
         List<String> partitionPaths =
                 PartitionPathUtils.generatePartitionPaths(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
index 25cd14af21..c566af0a19 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
@@ -54,7 +54,8 @@ public class MarkPartitionDoneAction extends TableActionBase {
     @Override
     public void run() throws Exception {
         List<PartitionMarkDoneAction> actions =
-                PartitionMarkDoneAction.createActions(fileStoreTable, 
fileStoreTable.coreOptions());
+                PartitionMarkDoneAction.createActions(
+                        getClass().getClassLoader(), fileStoreTable, 
fileStoreTable.coreOptions());
 
         List<String> partitionPaths =
                 PartitionPathUtils.generatePartitionPaths(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index f0a89a0bb3..d73553045b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -72,7 +72,8 @@ public class MarkPartitionDoneProcedure extends ProcedureBase 
{
         FileStoreTable fileStoreTable = (FileStoreTable) table;
         CoreOptions coreOptions = fileStoreTable.coreOptions();
         List<PartitionMarkDoneAction> actions =
-                PartitionMarkDoneAction.createActions(fileStoreTable, 
coreOptions);
+                PartitionMarkDoneAction.createActions(
+                        procedureContext.getClass().getClassLoader(), 
fileStoreTable, coreOptions);
 
         List<String> partitionPaths =
                 PartitionPathUtils.generatePartitionPaths(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
index d190b9ccf3..cbf14da456 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
@@ -64,6 +64,7 @@ public class PartitionListeners implements Closeable {
 
         // partition mark done
         PartitionMarkDone.create(
+                        context.getClass().getClassLoader(),
                         context.streamingCheckpointEnabled(),
                         context.isRestored(),
                         context.stateStore(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index 8714e0006e..6f360c7823 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -52,6 +52,7 @@ public class PartitionMarkDone implements PartitionListener {
     private final boolean waitCompaction;
 
     public static Optional<PartitionMarkDone> create(
+            ClassLoader cl,
             boolean isStreaming,
             boolean isRestored,
             OperatorStateStore stateStore,
@@ -75,7 +76,7 @@ public class PartitionMarkDone implements PartitionListener {
                 PartitionMarkDoneTrigger.create(coreOptions, isRestored, 
stateStore);
 
         List<PartitionMarkDoneAction> actions =
-                PartitionMarkDoneAction.createActions(table, coreOptions);
+                PartitionMarkDoneAction.createActions(cl, table, coreOptions);
 
         // if batch read skip level 0 files, we should wait compaction to mark 
done
         // otherwise, some data may not be readable, and there might be data 
delays
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
index 3c1b73df8c..63edb8bb4c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
@@ -20,7 +20,9 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
+import 
org.apache.paimon.flink.sink.partition.MockCustomPartitionMarkDoneAction;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
 import org.apache.paimon.partition.file.SuccessFile;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
@@ -36,8 +38,11 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.stream.Stream;
 
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for {@link MarkPartitionDoneAction}. */
@@ -103,7 +108,7 @@ public class MarkPartitionDoneActionITCase extends 
ActionITCaseBase {
 
     @ParameterizedTest
     @MethodSource("testArguments")
-    public void testDropPartitionWithMultiplePartitionKey(boolean hasPk, 
String invoker)
+    public void testPartitionMarkDoneWithMultiplePartitionKey(boolean hasPk, 
String invoker)
             throws Exception {
         FileStoreTable table = prepareTable(hasPk);
 
@@ -149,7 +154,72 @@ public class MarkPartitionDoneActionITCase extends 
ActionITCaseBase {
         assertThat(successFile2).isNotNull();
     }
 
+    @ParameterizedTest
+    @MethodSource("testArguments")
+    public void testCustomPartitionMarkDoneAction(boolean hasPk, String 
invoker) throws Exception {
+
+        Map<String, String> options = new HashMap<>();
+        options.put(
+                PARTITION_MARK_DONE_ACTION.key(),
+                PartitionMarkDoneAction.SUCCESS_FILE + "," + 
PartitionMarkDoneAction.CUSTOM);
+        options.put(
+                PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
+                MockCustomPartitionMarkDoneAction.class.getName());
+
+        FileStoreTable table = prepareTable(hasPk, options);
+
+        switch (invoker) {
+            case "action":
+                createAction(
+                                MarkPartitionDoneAction.class,
+                                "mark_partition_done",
+                                "--warehouse",
+                                warehouse,
+                                "--database",
+                                database,
+                                "--table",
+                                tableName,
+                                "--partition",
+                                "partKey0=0,partKey1=1",
+                                "--partition",
+                                "partKey0=1,partKey1=0")
+                        .run();
+                break;
+            case "procedure_indexed":
+                executeSQL(
+                        String.format(
+                                "CALL sys.mark_partition_done('%s.%s', 
'partKey0=0,partKey1=1;partKey0=1,partKey1=0')",
+                                database, tableName));
+                break;
+            case "procedure_named":
+                executeSQL(
+                        String.format(
+                                "CALL sys.mark_partition_done(`table` => 
'%s.%s', partitions => 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')",
+                                database, tableName));
+                break;
+            default:
+                throw new UnsupportedOperationException(invoker);
+        }
+
+        Path successPath1 = new Path(table.location(), 
"partKey0=0/partKey1=1/_SUCCESS");
+        SuccessFile successFile1 = SuccessFile.safelyFromPath(table.fileIO(), 
successPath1);
+        assertThat(successFile1).isNotNull();
+
+        Path successPath2 = new Path(table.location(), 
"partKey0=1/partKey1=0/_SUCCESS");
+        SuccessFile successFile2 = SuccessFile.safelyFromPath(table.fileIO(), 
successPath2);
+        assertThat(successFile2).isNotNull();
+
+        assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions())
+                .containsExactlyInAnyOrder("partKey0=0/partKey1=1/", 
"partKey0=1/partKey1=0/");
+    }
+
     private FileStoreTable prepareTable(boolean hasPk) throws Exception {
+        return prepareTable(hasPk, Collections.emptyMap());
+    }
+
+    private FileStoreTable prepareTable(boolean hasPk, Map<String, String> 
options)
+            throws Exception {
+
         FileStoreTable table =
                 createFileStoreTable(
                         ROW_TYPE,
@@ -158,7 +228,7 @@ public class MarkPartitionDoneActionITCase extends 
ActionITCaseBase {
                                 ? Arrays.asList("partKey0", "partKey1", "dt")
                                 : Collections.emptyList(),
                         hasPk ? Collections.emptyList() : 
Collections.singletonList("dt"),
-                        new HashMap<>());
+                        options);
         SnapshotManager snapshotManager = table.snapshotManager();
         StreamWriteBuilder streamWriteBuilder =
                 table.newStreamWriteBuilder().withCommitUser(commitUser);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
new file mode 100644
index 0000000000..d1599b6c57
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.partition;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
+import static 
org.apache.paimon.flink.sink.partition.PartitionMarkDoneTest.notifyCommits;
+import static 
org.apache.paimon.partition.actions.PartitionMarkDoneAction.CUSTOM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for custom PartitionMarkDoneAction. */
+public class CustomPartitionMarkDoneActionTest extends TableTestBase {
+
+    @Test
+    public void testCustomPartitionMarkDoneAction() throws Exception {
+
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .column("c", DataTypes.INT())
+                        .partitionKeys("a")
+                        .primaryKey("a", "b")
+                        .option(PARTITION_MARK_DONE_WHEN_END_INPUT.key(), 
"true")
+                        .option(PARTITION_MARK_DONE_ACTION.key(), 
"success-file,custom")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        Path location = table.location();
+        Path successFile = new Path(location, "a=0/_SUCCESS");
+
+        // Throwing the exception, if the parameter 
'partition.mark-done-action.custom.class' is not
+        // set.
+        Assertions.assertThatThrownBy(
+                        () ->
+                                PartitionMarkDone.create(
+                                        getClass().getClassLoader(),
+                                        false,
+                                        false,
+                                        new 
PartitionMarkDoneTest.MockOperatorStateStore(),
+                                        table))
+                .hasMessageContaining(
+                        String.format(
+                                "You need to set [%s] when you add [%s] mark 
done action in your property [%s].",
+                                PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
+                                CUSTOM,
+                                PARTITION_MARK_DONE_ACTION.key()));
+
+        // Set parameter 'partition.mark-done-action.custom.class'.
+        catalog.alterTable(
+                identifier,
+                SchemaChange.setOption(
+                        PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
+                        MockCustomPartitionMarkDoneAction.class.getName()),
+                true);
+
+        FileStoreTable table2 = (FileStoreTable) catalog.getTable(identifier);
+
+        PartitionMarkDone markDone =
+                PartitionMarkDone.create(
+                                getClass().getClassLoader(),
+                                false,
+                                false,
+                                new 
PartitionMarkDoneTest.MockOperatorStateStore(),
+                                table2)
+                        .get();
+
+        notifyCommits(markDone, false);
+
+        assertThat(table2.fileIO().exists(successFile)).isEqualTo(true);
+
+        
assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().iterator().next())
+                .isEqualTo("a=0/");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
new file mode 100644
index 0000000000..f8d9b40346
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.partition;
+
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/** The class is only applicable for {@link 
CustomPartitionMarkDoneActionTest}. */
+public class MockCustomPartitionMarkDoneAction implements 
PartitionMarkDoneAction {
+
+    private static final Set<String> markedDonePartitions = new HashSet<>();
+
+    @Override
+    public void markDone(String partition) {
+        MockCustomPartitionMarkDoneAction.markedDonePartitions.add(partition);
+    }
+
+    public static Set<String> getMarkedDonePartitions() {
+        return MockCustomPartitionMarkDoneAction.markedDonePartitions;
+    }
+
+    @Override
+    public void close() throws IOException {}
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index 9e5fe7ff9f..f737a19fa9 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -86,7 +86,13 @@ class PartitionMarkDoneTest extends TableTestBase {
         Path location = table.location();
         Path successFile = new Path(location, "a=0/_SUCCESS");
         PartitionMarkDone markDone =
-                PartitionMarkDone.create(false, false, new 
MockOperatorStateStore(), table).get();
+                PartitionMarkDone.create(
+                                getClass().getClassLoader(),
+                                false,
+                                false,
+                                new MockOperatorStateStore(),
+                                table)
+                        .get();
 
         notifyCommits(markDone, true);
         
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
@@ -97,7 +103,7 @@ class PartitionMarkDoneTest extends TableTestBase {
         }
     }
 
-    private void notifyCommits(PartitionMarkDone markDone, boolean isCompact) {
+    public static void notifyCommits(PartitionMarkDone markDone, boolean 
isCompact) {
         ManifestCommittable committable = new 
ManifestCommittable(Long.MAX_VALUE);
         DataFileMeta file = DataFileTestUtils.newFile();
         CommitMessageImpl compactMessage;
@@ -122,7 +128,7 @@ class PartitionMarkDoneTest extends TableTestBase {
         markDone.notifyCommittable(singletonList(committable));
     }
 
-    private static class MockOperatorStateStore implements OperatorStateStore {
+    public static class MockOperatorStateStore implements OperatorStateStore {
 
         @Override
         public <K, V> BroadcastState<K, V> getBroadcastState(
@@ -151,7 +157,7 @@ class PartitionMarkDoneTest extends TableTestBase {
         }
     }
 
-    private static class MockListState<T> implements ListState<T> {
+    public static class MockListState<T> implements ListState<T> {
 
         private final List<T> backingList = new ArrayList<>();
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
index ff064e9140..f5cc349b9b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
@@ -90,7 +90,8 @@ public class MarkPartitionDoneProcedure extends BaseProcedure 
{
                     FileStoreTable fileStoreTable = (FileStoreTable) table;
                     CoreOptions coreOptions = fileStoreTable.coreOptions();
                     List<PartitionMarkDoneAction> actions =
-                            
PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions);
+                            PartitionMarkDoneAction.createActions(
+                                    getClass().getClassLoader(), 
fileStoreTable, coreOptions);
 
                     List<String> partitionPaths =
                             PartitionPathUtils.generatePartitionPaths(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 27d9a0786a..80dd6ae425 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -74,7 +74,8 @@ case class WriteIntoPaimonTable(
   private def markDoneIfNeeded(commitMessages: Seq[CommitMessage]): Unit = {
     val coreOptions = table.coreOptions()
     if 
(coreOptions.toConfiguration.get(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT))
 {
-      val actions = PartitionMarkDoneAction.createActions(table, 
table.coreOptions())
+      val actions =
+        PartitionMarkDoneAction.createActions(getClass.getClassLoader, table, 
table.coreOptions())
       val partitionComputer = new InternalRowPartitionComputer(
         coreOptions.partitionDefaultName,
         TypeUtils.project(table.rowType(), table.partitionKeys()),
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
index 8abc7ddfda..5551c75505 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark.procedure
 
 import org.apache.paimon.fs.Path
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction
 import org.apache.paimon.partition.file.SuccessFile
 import org.apache.paimon.spark.PaimonSparkTestBase
 
@@ -58,4 +59,41 @@ class MarkPartitionDoneProcedureTest extends 
PaimonSparkTestBase {
 
   }
 
+  test("Paimon procedure: custom partition mark done test") {
+    spark.sql(
+      s"""
+         |CREATE TABLE T (id STRING, name STRING, day STRING)
+         |USING PAIMON
+         |PARTITIONED BY (day)
+         |TBLPROPERTIES (
+         |'primary-key'='day,id',
+         |'partition.mark-done-action'='success-file,custom',
+         
|'partition.mark-done-action.custom.class'='${classOf[MockCustomPartitionMarkDoneAction].getName}'
+         |)
+         |""".stripMargin)
+
+    spark.sql(s"INSERT INTO T VALUES ('1', 'a', '2024-07-13')")
+    spark.sql(s"INSERT INTO T VALUES ('2', 'b', '2024-07-14')")
+
+    checkAnswer(
+      spark.sql(
+        "CALL paimon.sys.mark_partition_done(" +
+          "table => 'test.T', partitions => 'day=2024-07-13;day=2024-07-14')"),
+      Row(true) :: Nil)
+
+    val table = loadTable("T")
+
+    val successPath1 = new Path(table.location, "day=2024-07-13/_SUCCESS")
+    val successFile1 = SuccessFile.safelyFromPath(table.fileIO, successPath1)
+    assertThat(successFile1).isNotNull
+
+    val successPath2 = new Path(table.location, "day=2024-07-14/_SUCCESS")
+    val successFile2 = SuccessFile.safelyFromPath(table.fileIO, successPath2)
+    assertThat(successFile2).isNotNull
+
+    
assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions.toArray)
+      .containsExactlyInAnyOrder("day=2024-07-14/", "day=2024-07-13/")
+
+  }
+
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MockCustomPartitionMarkDoneAction.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MockCustomPartitionMarkDoneAction.scala
new file mode 100644
index 0000000000..d123d2c120
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MockCustomPartitionMarkDoneAction.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction
+
+import java.util
+
+/** The case class is only applicable for {@link 
MarkPartitionDoneProcedureTest}. */
+case class MockCustomPartitionMarkDoneAction() extends PartitionMarkDoneAction 
{
+
+  override def markDone(partition: String): Unit = {
+    MockCustomPartitionMarkDoneAction.add(partition)
+  }
+
+  override def close(): Unit = {}
+}
+
+object MockCustomPartitionMarkDoneAction {
+  val markedDonePartitions = new util.HashSet[String]
+
+  def add(partition: String): Unit = {
+    markedDonePartitions.add(partition)
+  }
+
+  def getMarkedDonePartitions: util.Set[String] = {
+    markedDonePartitions
+  }
+}


Reply via email to