This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d01a3846b [spark] Support MarkPartitionDoneProcedure in Spark (#3746)
d01a3846b is described below
commit d01a3846b7b5ec4f5e63e22c70a6c8fa52ad7ed6
Author: xuzifu666 <[email protected]>
AuthorDate: Tue Jul 16 09:51:57 2024 +0800
[spark] Support MarkPartitionDoneProcedure in Spark (#3746)
---
docs/content/spark/procedures.md | 14 +++
.../shortcodes/generated/core_configuration.html | 6 +
.../generated/flink_connector_configuration.html | 6 -
.../main/java/org/apache/paimon/CoreOptions.java | 19 +++
.../partition/actions}/AddDonePartitionAction.java | 9 +-
.../actions}/MarkPartitionDoneEventAction.java | 3 +-
.../actions}/PartitionMarkDoneAction.java | 4 +-
.../actions}/SuccessFileMarkDoneAction.java | 18 ++-
.../apache/paimon/partition/file}/SuccessFile.java | 2 +-
.../apache/paimon/flink/FlinkConnectorOptions.java | 19 ---
.../flink/action/MarkPartitionDoneAction.java | 2 +-
.../procedure/MarkPartitionDoneProcedure.java | 2 +-
.../flink/sink/partition/PartitionMarkDone.java | 1 +
.../FlinkBatchJobPartitionMarkdoneITCase.java | 2 +-
.../action/MarkPartitionDoneActionITCase.java | 2 +-
.../sink/partition/AddDonePartitionActionTest.java | 1 +
.../partition/SuccessFileMarkDoneActionTest.java | 2 +
.../org/apache/paimon/spark/SparkProcedures.java | 2 +
.../procedure/MarkPartitionDoneProcedure.java | 133 +++++++++++++++++++++
.../procedure/MarkPartitionDoneProcedureTest.scala | 61 ++++++++++
20 files changed, 269 insertions(+), 39 deletions(-)
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index d464881d6..80e120740 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -201,6 +201,20 @@ This section introduce all available spark procedures
about paimon.
-- delete consumer<br/>
CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')
</td>
+ </tr>
+ <tr>
+ <td>mark_partition_done</td>
+ <td>
+ To mark partition to be done. Arguments:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ <li>partitions: partitions need to be mark done, If you specify
multiple partitions, delimiter is ';'.</li>
+ </td>
+ <td>
+ -- mark single partition done<br/>
+ CALL sys.mark_partition_done(table => 'default.T', parititions =>
'day=2024-07-01')<br/><br/>
+ -- mark multiple partitions done<br/>
+ CALL sys.mark_partition_done(table => 'default.T', parititions =>
'day=2024-07-01;day=2024-07-02')
+ </td>
</tr>
</tbody>
</table>
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9faeb0865..3e5709606 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -509,6 +509,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Duration</td>
<td>The expiration interval of a partition. A partition will be
expired if it‘s lifetime is over this value. Partition time is extracted from
the partition value.</td>
</tr>
+ <tr>
+ <td><h5>partition.mark-done-action</h5></td>
+ <td style="word-wrap: break-word;">"success-file"</td>
+ <td>String</td>
+ <td>Action to mark a partition done is to notify the downstream
application that the partition has finished writing, the partition is ready to
be read.<br />1. 'success-file': add '_success' file to directory.<br />2.
'done-partition': add 'xxx.done' partition to metastore.<br />Both can be
configured at the same time: 'done-partition,success-file'.</td>
+ </tr>
<tr>
<td><h5>partition.timestamp-formatter</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 32dbe6c0f..f5c98df17 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -92,12 +92,6 @@ under the License.
<td>Duration</td>
<td>Set a time duration when a partition has no new data after
this time duration, mark the done status to indicate that the data is
ready.</td>
</tr>
- <tr>
- <td><h5>partition.mark-done-action</h5></td>
- <td style="word-wrap: break-word;">"success-file"</td>
- <td>String</td>
- <td>Action to mark a partition done is to notify the downstream
application that the partition has finished writing, the partition is ready to
be read.<br />1. 'success-file': add '_success' file to directory.<br />2.
'done-partition': add 'xxx.done' partition to metastore.<br />Both can be
configured at the same time: 'done-partition,success-file'.</td>
- </tr>
<tr>
<td><h5>partition.time-interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 0ec6d59a0..d5bee40e8 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1033,6 +1033,25 @@ public class CoreOptions implements Serializable {
"Parameter string for the constructor of class #. "
+ "Callback class should parse the
parameter by itself.");
+ public static final ConfigOption<String> PARTITION_MARK_DONE_ACTION =
+ key("partition.mark-done-action")
+ .stringType()
+ .defaultValue("success-file")
+ .withDescription(
+ Description.builder()
+ .text(
+ "Action to mark a partition done
is to notify the downstream application that the partition"
+ + " has finished writing,
the partition is ready to be read.")
+ .linebreak()
+ .text("1. 'success-file': add '_success'
file to directory.")
+ .linebreak()
+ .text(
+ "2. 'done-partition': add
'xxx.done' partition to metastore.")
+ .linebreak()
+ .text(
+ "Both can be configured at the
same time: 'done-partition,success-file'.")
+ .build());
+
public static final ConfigOption<Boolean> METASTORE_PARTITIONED_TABLE =
key("metastore.partitioned-table")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/AddDonePartitionAction.java
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
similarity index 89%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/AddDonePartitionAction.java
rename to
paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
index ea888c6d5..c6db6cb6e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/AddDonePartitionAction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/AddDonePartitionAction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
@@ -25,13 +25,12 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import java.io.IOException;
import java.util.LinkedHashMap;
-import java.util.Map.Entry;
+import java.util.Map;
import static
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFromPath;
/** A {@link PartitionMarkDoneAction} which add ".done" partition. */
public class AddDonePartitionAction implements PartitionMarkDoneAction {
-
private final MetastoreClient metastoreClient;
public AddDonePartitionAction(MetastoreClient metastoreClient) {
@@ -41,12 +40,12 @@ public class AddDonePartitionAction implements
PartitionMarkDoneAction {
@Override
public void markDone(String partition) throws Exception {
LinkedHashMap<String, String> doneSpec =
extractPartitionSpecFromPath(new Path(partition));
- Entry<String, String> lastField = tailEntry(doneSpec);
+ Map.Entry<String, String> lastField = tailEntry(doneSpec);
doneSpec.put(lastField.getKey(), lastField.getValue() + ".done");
metastoreClient.addPartition(doneSpec);
}
- private Entry<String, String> tailEntry(LinkedHashMap<String, String>
partitionSpec) {
+ private Map.Entry<String, String> tailEntry(LinkedHashMap<String, String>
partitionSpec) {
return Iterators.getLast(partitionSpec.entrySet().iterator());
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
similarity index 97%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java
rename to
paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
index a6dc0bd1e..a5ebe3405 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/MarkPartitionDoneEventAction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/MarkPartitionDoneEventAction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.metastore.MetastoreClient;
@@ -28,6 +28,7 @@ import static
org.apache.paimon.utils.PartitionPathUtils.extractPartitionSpecFro
/** A {@link PartitionMarkDoneAction} which add mark
"PartitionEventType.LOAD_DONE". */
public class MarkPartitionDoneEventAction implements PartitionMarkDoneAction {
+
private final MetastoreClient metastoreClient;
public MarkPartitionDoneEventAction(MetastoreClient metastoreClient) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
similarity index 96%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java
rename to
paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
index 2177f7d0f..f79992efb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneAction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.metastore.MetastoreClient;
@@ -28,7 +28,7 @@ import java.util.List;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
-import static
org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_ACTION;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneAction.java
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/SuccessFileMarkDoneAction.java
similarity index 79%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneAction.java
rename to
paimon-core/src/main/java/org/apache/paimon/partition/actions/SuccessFileMarkDoneAction.java
index ac97b6f14..7c4ec375a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneAction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/actions/SuccessFileMarkDoneAction.java
@@ -16,10 +16,16 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.actions;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.partition.file.SuccessFile;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
/** A {@link PartitionMarkDoneAction} which create "_SUCCESS" file. */
public class SuccessFileMarkDoneAction implements PartitionMarkDoneAction {
@@ -49,6 +55,16 @@ public class SuccessFileMarkDoneAction implements
PartitionMarkDoneAction {
fileIO.overwriteFileUtf8(successPath, successFile.toJson());
}
+ @Nullable
+ public static SuccessFile safelyFromPath(FileIO fileIO, Path path) throws
IOException {
+ try {
+ String json = fileIO.readFileUtf8(path);
+ return SuccessFile.fromJson(json);
+ } catch (FileNotFoundException e) {
+ return null;
+ }
+ }
+
@Override
public void close() {}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFile.java
b/paimon-core/src/main/java/org/apache/paimon/partition/file/SuccessFile.java
similarity index 98%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFile.java
rename to
paimon-core/src/main/java/org/apache/paimon/partition/file/SuccessFile.java
index 4d3656e6f..39a202ccc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/SuccessFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/file/SuccessFile.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.partition.file;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 4c5d1f762..b42de765e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -357,25 +357,6 @@ public class FlinkConnectorOptions {
"You can specify time interval for partition, for
example, "
+ "daily partition is '1 d', hourly
partition is '1 h'.");
- public static final ConfigOption<String> PARTITION_MARK_DONE_ACTION =
- key("partition.mark-done-action")
- .stringType()
- .defaultValue("success-file")
- .withDescription(
- Description.builder()
- .text(
- "Action to mark a partition done
is to notify the downstream application that the partition"
- + " has finished writing,
the partition is ready to be read.")
- .linebreak()
- .text("1. 'success-file': add '_success'
file to directory.")
- .linebreak()
- .text(
- "2. 'done-partition': add
'xxx.done' partition to metastore.")
- .linebreak()
- .text(
- "Both can be configured at the
same time: 'done-partition,success-file'.")
- .build());
-
public static final ConfigOption<Boolean>
PARTITION_MARK_DONE_WHEN_END_INPUT =
ConfigOptions.key("partition.end-input-to-done")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
index f323c5509..9fd906ee4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.action;
-import org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.PartitionPathUtils;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index d846b25ee..d70cccf6b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index 6a92a0548..ac5895767 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.IOUtils;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
index 8e606b9a6..9c97151ec 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkBatchJobPartitionMarkdoneITCase.java
@@ -20,10 +20,10 @@ package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
-import org.apache.paimon.flink.sink.partition.SuccessFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.file.SuccessFile;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
index f2f163d6e..e6c2c8678 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
@@ -20,8 +20,8 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.flink.sink.partition.SuccessFile;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.partition.file.SuccessFile;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.types.DataType;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
index cff4479d9..5338b3886 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/AddDonePartitionActionTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink.partition;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.partition.actions.AddDonePartitionAction;
import org.junit.jupiter.api.Test;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
index 5a6818b25..95be02540 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/SuccessFileMarkDoneActionTest.java
@@ -20,6 +20,8 @@ package org.apache.paimon.flink.sink.partition;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.partition.actions.SuccessFileMarkDoneAction;
+import org.apache.paimon.partition.file.SuccessFile;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 36d508266..f143cf7b6 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -26,6 +26,7 @@ import org.apache.paimon.spark.procedure.DeleteTagProcedure;
import org.apache.paimon.spark.procedure.ExpirePartitionsProcedure;
import org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
import org.apache.paimon.spark.procedure.FastForwardProcedure;
+import org.apache.paimon.spark.procedure.MarkPartitionDoneProcedure;
import org.apache.paimon.spark.procedure.MigrateFileProcedure;
import org.apache.paimon.spark.procedure.MigrateTableProcedure;
import org.apache.paimon.spark.procedure.Procedure;
@@ -70,6 +71,7 @@ public class SparkProcedures {
procedureBuilders.put("repair", RepairProcedure::builder);
procedureBuilders.put("fast_forward", FastForwardProcedure::builder);
procedureBuilders.put("reset_consumer",
ResetConsumerProcedure::builder);
+ procedureBuilders.put("mark_partition_done",
MarkPartitionDoneProcedure::builder);
return procedureBuilders.build();
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
new file mode 100644
index 000000000..ff064e914
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.PartitionPathUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.List;
+
+import static org.apache.paimon.utils.ParameterUtils.getPartitions;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Partition mark done procedure. Usage:
+ *
+ * <pre><code>
+ * CALL sys.mark_partition_done('tableId', 'partition1;partition2')
+ * </code></pre>
+ */
+public class MarkPartitionDoneProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.required("partitions", StringType)
+ };
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
+ });
+
+ protected MarkPartitionDoneProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ String partitionStr = args.getString(1);
+ String[] partitions = partitionStr.split(";");
+
+ return modifyPaimonTable(
+ tableIdent,
+ table -> {
+ checkArgument(
+ table instanceof FileStoreTable,
+ "Only FileStoreTable supports mark_partition_done
procedure. The table type is '%s'.",
+ table.getClass().getName());
+
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ CoreOptions coreOptions = fileStoreTable.coreOptions();
+ List<PartitionMarkDoneAction> actions =
+
PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions);
+
+ List<String> partitionPaths =
+ PartitionPathUtils.generatePartitionPaths(
+ getPartitions(partitions),
+ fileStoreTable.store().partitionType());
+
+ markDone(partitionPaths, actions);
+
+ IOUtils.closeAllQuietly(actions);
+ InternalRow outputRow = newInternalRow(true);
+ return new InternalRow[] {outputRow};
+ });
+ }
+
+ public static void markDone(List<String> partitions,
List<PartitionMarkDoneAction> actions) {
+ for (String partition : partitions) {
+ try {
+ for (PartitionMarkDoneAction action : actions) {
+ action.markDone(partition);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder<MarkPartitionDoneProcedure>() {
+ @Override
+ public MarkPartitionDoneProcedure doBuild() {
+ return new MarkPartitionDoneProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "MarkPartitionDoneProcedure";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
new file mode 100644
index 000000000..8abc7ddfd
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.fs.Path
+import org.apache.paimon.partition.file.SuccessFile
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
+
+class MarkPartitionDoneProcedureTest extends PaimonSparkTestBase {
+
+ test("Paimon procedure: mark_partition_done test") {
+ spark.sql(s"""
+ |CREATE TABLE T (id STRING, name STRING, day STRING)
+ |USING PAIMON
+ |PARTITIONED BY (day)
+ |TBLPROPERTIES (
+ |'primary-key'='day,id',
+ |'partition.mark-done-action'='success-file')
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO T VALUES ('1', 'a', '2024-07-13')")
+ spark.sql(s"INSERT INTO T VALUES ('2', 'b', '2024-07-14')")
+
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.mark_partition_done(" +
+ "table => 'test.T', partitions => 'day=2024-07-13;day=2024-07-14')"),
+ Row(true) :: Nil)
+
+ val table = loadTable("T")
+
+ val successPath1 = new Path(table.location, "day=2024-07-13/_SUCCESS")
+ val successFile1 = SuccessFile.safelyFromPath(table.fileIO, successPath1)
+ assertThat(successFile1).isNotNull
+
+ val successPath2 = new Path(table.location, "day=2024-07-14/_SUCCESS")
+ val successFile2 = SuccessFile.safelyFromPath(table.fileIO, successPath2)
+ assertThat(successFile2).isNotNull
+
+ }
+
+}