This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d01a3846b [spark] Support MarkPartitionDoneProcedure in Spark (#3746)
d01a3846b is described below

commit d01a3846b7b5ec4f5e63e22c70a6c8fa52ad7ed6
Author: xuzifu666 <[email protected]>
AuthorDate: Tue Jul 16 09:51:57 2024 +0800

    [spark] Support MarkPartitionDoneProcedure in Spark (#3746)
---
 docs/content/spark/procedures.md                   |  14 +++
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../generated/flink_connector_configuration.html   |   6 -
 .../main/java/org/apache/paimon/CoreOptions.java   |  19 +++
 .../partition/actions}/AddDonePartitionAction.java |   9 +-
 .../actions}/MarkPartitionDoneEventAction.java     |   3 +-
 .../actions}/PartitionMarkDoneAction.java          |   4 +-
 .../actions}/SuccessFileMarkDoneAction.java        |  18 ++-
 .../apache/paimon/partition/file}/SuccessFile.java |   2 +-
 .../apache/paimon/flink/FlinkConnectorOptions.java |  19 ---
 .../flink/action/MarkPartitionDoneAction.java      |   2 +-
 .../procedure/MarkPartitionDoneProcedure.java      |   2 +-
 .../flink/sink/partition/PartitionMarkDone.java    |   1 +
 .../FlinkBatchJobPartitionMarkdoneITCase.java      |   2 +-
 .../action/MarkPartitionDoneActionITCase.java      |   2 +-
 .../sink/partition/AddDonePartitionActionTest.java |   1 +
 .../partition/SuccessFileMarkDoneActionTest.java   |   2 +
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../procedure/MarkPartitionDoneProcedure.java      | 133 +++++++++++++++++++++
 .../procedure/MarkPartitionDoneProcedureTest.scala |  61 ++++++++++
 20 files changed, 269 insertions(+), 39 deletions(-)

diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index d464881d6..80e120740 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -201,6 +201,20 @@ This section introduce all available spark procedures 
about paimon.
          -- delete consumer<br/>
          CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')
       </td>
+   </tr>
+    <tr>
+      <td>mark_partition_done</td>
+      <td>
+         To mark partition to be done. Arguments:
+            <li>table: the target table identifier. Cannot be empty.</li>
+            <li>partitions: partitions need to be mark done, If you specify 
multiple partitions, delimiter is ';'.</li>
+      </td>
+      <td>
+         -- mark single partition done<br/>
+         CALL sys.mark_partition_done(table => 'default.T', parititions => 
'day=2024-07-01')<br/><br/>
+         -- mark multiple partitions done<br/>
+         CALL sys.mark_partition_done(table => 'default.T', parititions => 
'day=2024-07-01;day=2024-07-02')
+      </td>
    </tr>
     </tbody>
 </table>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9faeb0865..3e5709606 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -509,6 +509,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Duration</td>
             <td>The expiration interval of a partition. A partition will be 
expired if it‘s lifetime is over this value. Partition time is extracted from 
the partition value.</td>
         </tr>
+        <tr>
+            <td><h5>partition.mark-done-action</h5></td>
+            <td style="word-wrap: break-word;">"success-file"</td>
+            <td>String</td>
+            <td>Action to mark a partition done is to notify the downstream 
application that the partition has finished writing, the partition is ready to 
be read.<br />1. 'success-file': add '_success' file to directory.<br />2. 
'done-partition': add 'xxx.done' partition to metastore.<br />Both can be 
configured at the same time: 'done-partition,success-file'.</td>
+        </tr>
         <tr>
             <td><h5>partition.timestamp-formatter</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 32dbe6c0f..f5c98df17 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -92,12 +92,6 @@ under the License.
             <td>Duration</td>
             <td>Set a time duration when a partition has no new data after 
this time duration, mark the done status to indicate that the data is 
ready.</td>
         </tr>
-        <tr>
-            <td><h5>partition.mark-done-action</h5></td>
-            <td style="word-wrap: break-word;">"success-file"</td>
-            <td>String</td>
-            <td>Action to mark a partition done is to notify the downstream 
application that the partition has finished writing, the partition is ready to 
be read.<br />1. 'success-file': add '_success' file to directory.<br />2. 
'done-partition': add 'xxx.done' partition to metastore.<br />Both can be 
configured at the same time: 'done-partition,success-file'.</td>
-        </tr>
         <tr>
             <td><h5>partition.time-interval</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0ec6d59a0..d5bee40e8 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1033,6 +1033,25 @@ public class CoreOptions implements Serializable {
                             "Parameter string for the constructor of class #. "
                                     + "Callback class should parse the 
parameter by itself.");
 
+    public static final ConfigOption<String> PARTITION_MARK_DONE_ACTION =
+            key("partition.mark-done-action")
+                    .stringType()
+                    .defaultValue("success-file")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Action to mark a partition done 
is to notify the downstream application that the partition"
+                                                    + " has finished writing, 
the partition is ready to be read.")
+                                    .linebreak()
+                                    .text("1. 'success-file': add '_success' 
file to directory.")
+                                    .linebreak()
+                                    .text(
+                                            "2. 'done-partition': add 
'xxx.done' partition to metastore.")
+                                    .linebreak()
+                                    .text(
+                                            "Both can be configured at the 
same time: 'done-partition,success-file'.")
+                                    .build());
+
     public static final ConfigOption<Boolean> METASTORE_PARTITIONED_TABLE =
             key("metastore.partitioned-table")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/AddDonePartitionAction.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
similarity index 89%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/AddDonePartitionAction.java
rename to 
paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
index ea888c6d5..c6db6cb6e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/AddDonePartitionAction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
 
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.metastore.MetastoreClient;
@@ -25,13 +25,12 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
 import java.io.IOException;
 import java.util.LinkedHashMap;
-import java.util.Map.Entry;
+import java.util.Map;
 
 import static 
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
 
 /** A {@link PartitionMarkDoneAction} which add ".done" partition. */
 public class AddDonePartitionAction implements PartitionMarkDoneAction {
-
     private final MetastoreClient metastoreClient;
 
     public AddDonePartitionAction(MetastoreClient metastoreClient) {
@@ -41,12 +40,12 @@ public class AddDonePartitionAction implements 
PartitionMarkDoneAction {
     @Override
     public void markDone(String partition) throws Exception {
         LinkedHashMap<String, String> doneSpec = 
extractPartitionSpecFromPath(new Path(partition));
-        Entry<String, String> lastField = tailEntry(doneSpec);
+        Map.Entry<String, String> lastField = tailEntry(doneSpec);
         doneSpec.put(lastField.getKey(), lastField.getValue() + ".done");
         metastoreClient.addPartition(doneSpec);
     }
 
-    private Entry<String, String> tailEntry(LinkedHashMap<String, String> 
partitionSpec) {
+    private Map.Entry<String, String> tailEntry(LinkedHashMap<String, String> 
partitionSpec) {
         return Iterators.getLast(partitionSpec.entrySet().iterator());
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
similarity index 97%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java
rename to 
paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
index a6dc0bd1e..a5ebe3405 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
 
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.metastore.MetastoreClient;
@@ -28,6 +28,7 @@ import static 
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFro
 
 /** A {@link PartitionMarkDoneAction} which add mark 
"PartitionEventType.LOAD_DONE". */
 public class MarkPartitionDoneEventAction implements PartitionMarkDoneAction {
+
     private final MetastoreClient metastoreClient;
 
     public MarkPartitionDoneEventAction(MetastoreClient metastoreClient) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
similarity index 96%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java
rename to 
paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
index 2177f7d0f..f79992efb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.metastore.MetastoreClient;
@@ -28,7 +28,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_ACTION;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneAction.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/SuccessFileMarkDoneAction.java
similarity index 79%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneAction.java
rename to 
paimon-core/src/main/java/org/apache/paimon/partition/actions/SuccessFileMarkDoneAction.java
index ac97b6f14..7c4ec375a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneAction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/SuccessFileMarkDoneAction.java
@@ -16,10 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.partition.file.SuccessFile;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
 
 /** A {@link PartitionMarkDoneAction} which create "_SUCCESS" file. */
 public class SuccessFileMarkDoneAction implements PartitionMarkDoneAction {
@@ -49,6 +55,16 @@ public class SuccessFileMarkDoneAction implements 
PartitionMarkDoneAction {
         fileIO.overwriteFileUtf8(successPath, successFile.toJson());
     }
 
+    @Nullable
+    public static SuccessFile safelyFromPath(FileIO fileIO, Path path) throws 
IOException {
+        try {
+            String json = fileIO.readFileUtf8(path);
+            return SuccessFile.fromJson(json);
+        } catch (FileNotFoundException e) {
+            return null;
+        }
+    }
+
     @Override
     public void close() {}
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFile.java
 b/paimon-core/src/main/java/org/apache/paimon/partition/file/SuccessFile.java
similarity index 98%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFile.java
rename to 
paimon-core/src/main/java/org/apache/paimon/partition/file/SuccessFile.java
index 4d3656e6f..39a202ccc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/file/SuccessFile.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.file;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 4c5d1f762..b42de765e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -357,25 +357,6 @@ public class FlinkConnectorOptions {
                             "You can specify time interval for partition, for 
example, "
                                     + "daily partition is '1 d', hourly 
partition is '1 h'.");
 
-    public static final ConfigOption<String> PARTITION_MARK_DONE_ACTION =
-            key("partition.mark-done-action")
-                    .stringType()
-                    .defaultValue("success-file")
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Action to mark a partition done 
is to notify the downstream application that the partition"
-                                                    + " has finished writing, 
the partition is ready to be read.")
-                                    .linebreak()
-                                    .text("1. 'success-file': add '_success' 
file to directory.")
-                                    .linebreak()
-                                    .text(
-                                            "2. 'done-partition': add 
'xxx.done' partition to metastore.")
-                                    .linebreak()
-                                    .text(
-                                            "Both can be configured at the 
same time: 'done-partition,success-file'.")
-                                    .build());
-
     public static final ConfigOption<Boolean> 
PARTITION_MARK_DONE_WHEN_END_INPUT =
             ConfigOptions.key("partition.end-input-to-done")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
index f323c5509..9fd906ee4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.PartitionPathUtils;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index d846b25ee..d70cccf6b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.procedure;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.IOUtils;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index 6a92a0548..ac5895767 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.utils.IOUtils;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
index 8e606b9a6..9c97151ec 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
@@ -20,10 +20,10 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
-import org.apache.paimon.flink.sink.partition.SuccessFile;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.file.SuccessFile;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
index f2f163d6e..e6c2c8678 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
@@ -20,8 +20,8 @@ package org.apache.paimon.flink.action;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.flink.sink.partition.SuccessFile;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.partition.file.SuccessFile;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.types.DataType;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
index cff4479d9..5338b3886 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.partition;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.partition.actions.AddDonePartitionAction;
 
 import org.junit.jupiter.api.Test;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
index 5a6818b25..95be02540 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink.sink.partition;
 
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.partition.actions.SuccessFileMarkDoneAction;
+import org.apache.paimon.partition.file.SuccessFile;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 36d508266..f143cf7b6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -26,6 +26,7 @@ import org.apache.paimon.spark.procedure.DeleteTagProcedure;
 import org.apache.paimon.spark.procedure.ExpirePartitionsProcedure;
 import org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
 import org.apache.paimon.spark.procedure.FastForwardProcedure;
+import org.apache.paimon.spark.procedure.MarkPartitionDoneProcedure;
 import org.apache.paimon.spark.procedure.MigrateFileProcedure;
 import org.apache.paimon.spark.procedure.MigrateTableProcedure;
 import org.apache.paimon.spark.procedure.Procedure;
@@ -70,6 +71,7 @@ public class SparkProcedures {
         procedureBuilders.put("repair", RepairProcedure::builder);
         procedureBuilders.put("fast_forward", FastForwardProcedure::builder);
         procedureBuilders.put("reset_consumer", 
ResetConsumerProcedure::builder);
+        procedureBuilders.put("mark_partition_done", 
MarkPartitionDoneProcedure::builder);
         return procedureBuilders.build();
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
new file mode 100644
index 000000000..ff064e914
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.PartitionPathUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+
+import static org.apache.paimon.utils.ParameterUtils.getPartitions;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Partition mark done procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.mark_partition_done('tableId', 'partition1;partition2')
+ * </code></pre>
+ */
+public class MarkPartitionDoneProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("table", StringType),
+                ProcedureParameter.required("partitions", StringType)
+            };
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected MarkPartitionDoneProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        String partitionStr = args.getString(1);
+        String[] partitions = partitionStr.split(";");
+
+        return modifyPaimonTable(
+                tableIdent,
+                table -> {
+                    checkArgument(
+                            table instanceof FileStoreTable,
+                            "Only FileStoreTable supports mark_partition_done 
procedure. The table type is '%s'.",
+                            table.getClass().getName());
+
+                    FileStoreTable fileStoreTable = (FileStoreTable) table;
+                    CoreOptions coreOptions = fileStoreTable.coreOptions();
+                    List<PartitionMarkDoneAction> actions =
+                            
PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions);
+
+                    List<String> partitionPaths =
+                            PartitionPathUtils.generatePartitionPaths(
+                                    getPartitions(partitions),
+                                    fileStoreTable.store().partitionType());
+
+                    markDone(partitionPaths, actions);
+
+                    IOUtils.closeAllQuietly(actions);
+                    InternalRow outputRow = newInternalRow(true);
+                    return new InternalRow[] {outputRow};
+                });
+    }
+
+    public static void markDone(List<String> partitions, 
List<PartitionMarkDoneAction> actions) {
+        for (String partition : partitions) {
+            try {
+                for (PartitionMarkDoneAction action : actions) {
+                    action.markDone(partition);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static ProcedureBuilder builder() {
+        return new BaseProcedure.Builder<MarkPartitionDoneProcedure>() {
+            @Override
+            public MarkPartitionDoneProcedure doBuild() {
+                return new MarkPartitionDoneProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "MarkPartitionDoneProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
new file mode 100644
index 000000000..8abc7ddfd
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.fs.Path
+import org.apache.paimon.partition.file.SuccessFile
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
+
+class MarkPartitionDoneProcedureTest extends PaimonSparkTestBase {
+
+  test("Paimon procedure: mark_partition_done test") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id STRING, name STRING, day STRING)
+                 |USING PAIMON
+                 |PARTITIONED BY (day)
+                 |TBLPROPERTIES (
+                 |'primary-key'='day,id',
+                 |'partition.mark-done-action'='success-file')
+                 |""".stripMargin)
+
+    spark.sql(s"INSERT INTO T VALUES ('1', 'a', '2024-07-13')")
+    spark.sql(s"INSERT INTO T VALUES ('2', 'b', '2024-07-14')")
+
+    checkAnswer(
+      spark.sql(
+        "CALL paimon.sys.mark_partition_done(" +
+          "table => 'test.T', partitions => 'day=2024-07-13;day=2024-07-14')"),
+      Row(true) :: Nil)
+
+    val table = loadTable("T")
+
+    val successPath1 = new Path(table.location, "day=2024-07-13/_SUCCESS")
+    val successFile1 = SuccessFile.safelyFromPath(table.fileIO, successPath1)
+    assertThat(successFile1).isNotNull
+
+    val successPath2 = new Path(table.location, "day=2024-07-14/_SUCCESS")
+    val successFile2 = SuccessFile.safelyFromPath(table.fileIO, successPath2)
+    assertThat(successFile2).isNotNull
+
+  }
+
+}

Reply via email to