This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 737c3435f [flink] Flink support expire partitions procedure. (#3486)
737c3435f is described below

commit 737c3435ffa49aafe997a36bbb7ead824c291cff
Author: HunterXHunter <[email protected]>
AuthorDate: Wed Jun 19 17:42:18 2024 +0800

    [flink] Flink support expire partitions procedure. (#3486)
---
 docs/content/flink/procedures.md                   | 18 ++++++
 .../apache/paimon/operation/PartitionExpire.java   |  2 +-
 .../ProcedurePositionalArgumentsITCase.java        | 32 +++++++++++
 .../flink/procedure/ExpirePartitionsProcedure.java | 67 ++++++++++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |  1 +
 .../procedure/ExpirePartitionsProcedureITCase.java | 61 ++++++++++++++++++++
 6 files changed, 180 insertions(+), 1 deletion(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index a1a8bfe37..cfa905628 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -238,6 +238,24 @@ All available procedures are listed below.
          CALL sys.expire_snapshots(`table` => 'default.T', older_than => 
'2024-01-01 12:00:00', max_deletes => 10)<br/><br/>
       </td>
    </tr>
+<tr>
+      <td>expire_partitions</td>
+      <td>
+         CALL sys.expire_partitions(table, expiration_time, 
timestamp_formatter)<br/><br/>
+      </td>
+      <td>
+         To expire partitions. Argument:
+            <li>table: the target table identifier. Cannot be empty.</li>
+            <li>expiration_time: the expiration interval of a partition. A 
partition will be expired if it‘s lifetime is over this value. Partition time 
is extracted from the partition value.</li>
+            <li>timestamp_formatter: the formatter to format timestamp from 
string.</li>
+      </td>
+      <td>
+         -- for Flink 1.18<br/><br/>
+         CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd')<br/><br/>
+         -- for Flink 1.19 and later<br/><br/>
+         CALL sys.expire_partitions(`table` => 'default.T', expiration_time => 
'1 d', timestamp_formatter => 'yyyy-MM-dd')<br/><br/>
+      </td>
+   </tr>
     <tr>
       <td>repair</td>
       <td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 94a35cb54..f99ff57af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -87,7 +87,7 @@ public class PartitionExpire {
 
     @VisibleForTesting
     void expire(LocalDateTime now, long commitIdentifier) {
-        if (now.isAfter(lastCheck.plus(checkInterval))) {
+        if (checkInterval.isZero() || 
now.isAfter(lastCheck.plus(checkInterval))) {
             doExpire(now.minus(expirationTime), commitIdentifier);
             lastCheck = now;
         }
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index d47947caa..443b60003 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -19,9 +19,15 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 
 /** Ensure that the legacy multiply overloaded CALL with positional arguments 
can be invoked. */
@@ -48,4 +54,30 @@ public class ProcedurePositionalArgumentsITCase extends 
CatalogITCaseBase {
         assertThatCode(() -> sql("CALL sys.compact('default.T', '', '', '', 
'sink.parallelism=1')"))
                 .doesNotThrowAnyException();
     }
+
+    @Test
+    public void testExpirePartitionsProcedure() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k STRING,"
+                        + " dt STRING,"
+                        + " PRIMARY KEY (k, dt) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt) WITH ("
+                        + " 'bucket' = '1'"
+                        + ")");
+        FileStoreTable table = paimonTable("T");
+        sql("INSERT INTO T VALUES ('1', '2024-06-01')");
+        sql("INSERT INTO T VALUES ('2', '9024-06-01')");
+        assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01", 
"2:9024-06-01");
+        sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd')");
+        assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01");
+    }
+
+    private List<String> read(FileStoreTable table) throws IOException {
+        List<String> ret = new ArrayList<>();
+        table.newRead()
+                .createReader(table.newScan().plan().splits())
+                .forEachRemaining(row -> ret.add(row.getString(0) + ":" + 
row.getString(1)));
+        return ret;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
new file mode 100644
index 000000000..6e21b3522
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.TimeUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.time.Duration;
+
+/** A procedure to expire partitions. */
+public class ExpirePartitionsProcedure extends ProcedureBase {
+    @Override
+    public String identifier() {
+        return "expire_partitions";
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "expiration_time", type = 
@DataTypeHint(value = "STRING")),
+                @ArgumentHint(name = "timestamp_formatter", type = 
@DataTypeHint("STRING"))
+            })
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String expirationTime,
+            String timestampFormatter)
+            throws Catalog.TableNotExistException {
+        FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
+        FileStore fileStore = fileStoreTable.store();
+        PartitionExpire partitionExpire =
+                new PartitionExpire(
+                        fileStore.partitionType(),
+                        TimeUtils.parseDuration(expirationTime),
+                        Duration.ofMillis(0L),
+                        null,
+                        timestampFormatter,
+                        fileStore.newScan(),
+                        fileStore.newCommit(""));
+        partitionExpire.expire(Long.MAX_VALUE);
+        return new String[] {};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 192be6c14..847390dac 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -46,6 +46,7 @@ org.apache.paimon.flink.procedure.MigrateFileProcedure
 org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
 org.apache.paimon.flink.procedure.QueryServiceProcedure
 org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
+org.apache.paimon.flink.procedure.ExpirePartitionsProcedure
 org.apache.paimon.flink.procedure.privilege.InitFileBasedPrivilegeProcedure
 org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure
 org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
new file mode 100644
index 000000000..348ec36a5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link ExpirePartitionsProcedure}. */
+public class ExpirePartitionsProcedureITCase extends CatalogITCaseBase {
+
+    @Test
+    public void testExpirePartitionsProcedure() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k STRING,"
+                        + " dt STRING,"
+                        + " PRIMARY KEY (k, dt) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt) WITH ("
+                        + " 'bucket' = '1'"
+                        + ")");
+        FileStoreTable table = paimonTable("T");
+        sql("INSERT INTO T VALUES ('1', '2024-06-01')");
+        sql("INSERT INTO T VALUES ('2', '9024-06-01')");
+        assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01", 
"2:9024-06-01");
+        sql(
+                "CALL sys.expire_partitions(`table` => 'default.T', 
expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')");
+        assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01");
+    }
+
+    private List<String> read(FileStoreTable table) throws IOException {
+        List<String> ret = new ArrayList<>();
+        table.newRead()
+                .createReader(table.newScan().plan().splits())
+                .forEachRemaining(row -> ret.add(row.getString(0) + ":" + 
row.getString(1)));
+        return ret;
+    }
+}

Reply via email to