This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 737c3435f [flink] Flink support expire partitions procedure. (#3486)
737c3435f is described below
commit 737c3435ffa49aafe997a36bbb7ead824c291cff
Author: HunterXHunter <[email protected]>
AuthorDate: Wed Jun 19 17:42:18 2024 +0800
[flink] Flink support expire partitions procedure. (#3486)
---
docs/content/flink/procedures.md | 18 ++++++
.../apache/paimon/operation/PartitionExpire.java | 2 +-
.../ProcedurePositionalArgumentsITCase.java | 32 +++++++++++
.../flink/procedure/ExpirePartitionsProcedure.java | 67 ++++++++++++++++++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../procedure/ExpirePartitionsProcedureITCase.java | 61 ++++++++++++++++++++
6 files changed, 180 insertions(+), 1 deletion(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index a1a8bfe37..cfa905628 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -238,6 +238,24 @@ All available procedures are listed below.
CALL sys.expire_snapshots(`table` => 'default.T', older_than =>
'2024-01-01 12:00:00', max_deletes => 10)<br/><br/>
</td>
</tr>
+<tr>
+ <td>expire_partitions</td>
+ <td>
+ CALL sys.expire_partitions(table, expiration_time,
timestamp_formatter)<br/><br/>
+ </td>
+ <td>
+ To expire partitions. Argument:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ <li>expiration_time: the expiration interval of a partition. A
partition will be expired if it‘s lifetime is over this value. Partition time
is extracted from the partition value.</li>
+ <li>timestamp_formatter: the formatter to format timestamp from
string.</li>
+ </td>
+ <td>
+ -- for Flink 1.18<br/><br/>
+ CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd')<br/><br/>
+ -- for Flink 1.19 and later<br/><br/>
+ CALL sys.expire_partitions(`table` => 'default.T', expiration_time =>
'1 d', timestamp_formatter => 'yyyy-MM-dd')<br/><br/>
+ </td>
+ </tr>
<tr>
<td>repair</td>
<td>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 94a35cb54..f99ff57af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -87,7 +87,7 @@ public class PartitionExpire {
@VisibleForTesting
void expire(LocalDateTime now, long commitIdentifier) {
- if (now.isAfter(lastCheck.plus(checkInterval))) {
+ if (checkInterval.isZero() ||
now.isAfter(lastCheck.plus(checkInterval))) {
doExpire(now.minus(expirationTime), commitIdentifier);
lastCheck = now;
}
diff --git
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index d47947caa..443b60003 100644
---
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -19,9 +19,15 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
/** Ensure that the legacy multiply overloaded CALL with positional arguments
can be invoked. */
@@ -48,4 +54,30 @@ public class ProcedurePositionalArgumentsITCase extends
CatalogITCaseBase {
assertThatCode(() -> sql("CALL sys.compact('default.T', '', '', '',
'sink.parallelism=1')"))
.doesNotThrowAnyException();
}
+
+ @Test
+ public void testExpirePartitionsProcedure() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt) WITH ("
+ + " 'bucket' = '1'"
+ + ")");
+ FileStoreTable table = paimonTable("T");
+ sql("INSERT INTO T VALUES ('1', '2024-06-01')");
+ sql("INSERT INTO T VALUES ('2', '9024-06-01')");
+ assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01",
"2:9024-06-01");
+ sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd')");
+ assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01");
+ }
+
+ private List<String> read(FileStoreTable table) throws IOException {
+ List<String> ret = new ArrayList<>();
+ table.newRead()
+ .createReader(table.newScan().plan().splits())
+ .forEachRemaining(row -> ret.add(row.getString(0) + ":" +
row.getString(1)));
+ return ret;
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
new file mode 100644
index 000000000..6e21b3522
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.TimeUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.time.Duration;
+
+/** A procedure to expire partitions. */
+public class ExpirePartitionsProcedure extends ProcedureBase {
+ @Override
+ public String identifier() {
+ return "expire_partitions";
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "expiration_time", type =
@DataTypeHint(value = "STRING")),
+ @ArgumentHint(name = "timestamp_formatter", type =
@DataTypeHint("STRING"))
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String expirationTime,
+ String timestampFormatter)
+ throws Catalog.TableNotExistException {
+ FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
+ FileStore fileStore = fileStoreTable.store();
+ PartitionExpire partitionExpire =
+ new PartitionExpire(
+ fileStore.partitionType(),
+ TimeUtils.parseDuration(expirationTime),
+ Duration.ofMillis(0L),
+ null,
+ timestampFormatter,
+ fileStore.newScan(),
+ fileStore.newCommit(""));
+ partitionExpire.expire(Long.MAX_VALUE);
+ return new String[] {};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 192be6c14..847390dac 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -46,6 +46,7 @@ org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
+org.apache.paimon.flink.procedure.ExpirePartitionsProcedure
org.apache.paimon.flink.procedure.privilege.InitFileBasedPrivilegeProcedure
org.apache.paimon.flink.procedure.privilege.CreatePrivilegedUserProcedure
org.apache.paimon.flink.procedure.privilege.DropPrivilegedUserProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
new file mode 100644
index 000000000..348ec36a5
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link ExpirePartitionsProcedure}. */
+public class ExpirePartitionsProcedureITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testExpirePartitionsProcedure() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt) WITH ("
+ + " 'bucket' = '1'"
+ + ")");
+ FileStoreTable table = paimonTable("T");
+ sql("INSERT INTO T VALUES ('1', '2024-06-01')");
+ sql("INSERT INTO T VALUES ('2', '9024-06-01')");
+ assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01",
"2:9024-06-01");
+ sql(
+ "CALL sys.expire_partitions(`table` => 'default.T',
expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd')");
+ assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01");
+ }
+
+ private List<String> read(FileStoreTable table) throws IOException {
+ List<String> ret = new ArrayList<>();
+ table.newRead()
+ .createReader(table.newScan().plan().splits())
+ .forEachRemaining(row -> ret.add(row.getString(0) + ":" +
row.getString(1)));
+ return ret;
+ }
+}