This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 071dbf2ae [flink] Introduce ExpirePartitionsAction in flink (#3586)
071dbf2ae is described below

commit 071dbf2ae237db1e95ff9adb1de7dcd3c0adedc2
Author: xuzifu666 <[email protected]>
AuthorDate: Mon Jun 24 17:41:20 2024 +0800

    [flink] Introduce ExpirePartitionsAction in flink (#3586)
---
 .../apache/paimon/flink/action/ActionFactory.java  |   2 +
 .../flink/action/ExpirePartitionsAction.java       |  79 ++++++++++++++
 .../action/ExpirePartitionsActionFactory.java      |  68 ++++++++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../flink/action/ExpirePartitionsActionITCase.java | 118 +++++++++++++++++++++
 5 files changed, 268 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
index e6ac1dfaa..aeeef29c9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
@@ -53,6 +53,8 @@ public interface ActionFactory extends Factory {
     String CATALOG_CONF = "catalog_conf";
     String TABLE_CONF = "table_conf";
     String PARTITION = "partition";
+    String EXPIRATIONTIME = "expiration_time";
+    String TIMESTAMPFORMATTER = "timestamp_formatter";
 
     Optional<Action> create(MultipleParameterToolAdapter params);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
new file mode 100644
index 000000000..7d1c4dd5e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.FileStore;
+import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.TimeUtils;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.catalog.CatalogUtils.table;
+
+/** Expire partitions action for Flink. */
+public class ExpirePartitionsAction extends TableActionBase {
+    private final String expirationTime;
+    private final String timestampFormatter;
+    private PartitionExpire partitionExpire;
+
+    public ExpirePartitionsAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig,
+            String expirationTime,
+            String timestampFormatter) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+        if (!(table instanceof FileStoreTable)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Only FileStoreTable supports expire_partitions 
action. The table type is '%s'.",
+                            table.getClass().getName()));
+        }
+        this.expirationTime = expirationTime;
+        this.timestampFormatter = timestampFormatter;
+
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        FileStore fileStore = fileStoreTable.store();
+        this.partitionExpire =
+                new PartitionExpire(
+                        fileStore.partitionType(),
+                        TimeUtils.parseDuration(expirationTime),
+                        Duration.ofMillis(0L),
+                        null,
+                        timestampFormatter,
+                        fileStore.newScan(),
+                        fileStore.newCommit(""),
+                        Optional.ofNullable(
+                                        fileStoreTable
+                                                .catalogEnvironment()
+                                                .metastoreClientFactory())
+                                .map(MetastoreClient.Factory::create)
+                                .orElse(null));
+    }
+
+    @Override
+    public void run() throws Exception {
+        this.partitionExpire.expire(Long.MAX_VALUE);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
new file mode 100644
index 000000000..de56fc194
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link ExpirePartitionsAction}. */
+public class ExpirePartitionsActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "expire_partitions";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        Tuple3<String, String, String> tablePath = getTablePath(params);
+
+        checkRequiredArgument(params, EXPIRATIONTIME);
+        checkRequiredArgument(params, TIMESTAMPFORMATTER);
+        String expirationTime = params.get(EXPIRATIONTIME);
+        String timestampFormatter = params.get(TIMESTAMPFORMATTER);
+
+        Map<String, String> catalogConfig = optionalConfigMap(params, 
CATALOG_CONF);
+
+        return Optional.of(
+                new ExpirePartitionsAction(
+                        tablePath.f0,
+                        tablePath.f1,
+                        tablePath.f2,
+                        catalogConfig,
+                        expirationTime,
+                        timestampFormatter));
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println("Action \"expire_partitions\" expire table 
partitions by name.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  expire_partitions --warehouse <warehouse_path> --database 
<database_name> "
+                        + "--table <table_name> --tag_name <tag_name> 
--expiration_time <expiration_time> --timestamp_formatter 
<timestamp_formatter>");
+        System.out.println();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index c41539091..5a69314c3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -28,6 +28,7 @@ org.apache.paimon.flink.action.MigrateTableActionFactory
 org.apache.paimon.flink.action.MigrateDatabaseActionFactory
 org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
 org.apache.paimon.flink.action.QueryServiceActionFactory
+org.apache.paimon.flink.action.ExpirePartitionsActionFactory
 
 ### procedure factories
 org.apache.paimon.flink.procedure.CompactDatabaseProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
new file mode 100644
index 000000000..55a36637e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link ExpirePartitionsAction}. */
+public class ExpirePartitionsActionITCase extends ActionITCaseBase {
+
+    private static final DataType[] FIELD_TYPES =
+            new DataType[] {DataTypes.STRING(), DataTypes.STRING()};
+
+    private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new 
String[] {"k", "v"});
+
+    @BeforeEach
+    public void setUp() {
+        init(warehouse);
+    }
+
+    @Test
+    public void testExpirePartitionsAction() throws Exception {
+        FileStoreTable table = prepareTable();
+        TableScan.Plan plan = table.newReadBuilder().newScan().plan();
+        List<String> actual = getResult(table.newReadBuilder().newRead(), 
plan.splits(), ROW_TYPE);
+        List<String> expected;
+        expected = Arrays.asList("+I[1, 2024-01-01]", "+I[2, 2024-12-31]");
+
+        assertThat(actual).isEqualTo(expected);
+
+        createAction(
+                        ExpirePartitionsAction.class,
+                        "expire_partitions",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--expiration_time",
+                        "1 d",
+                        "--timestamp_formatter",
+                        "yyyy-MM-dd")
+                .run();
+        SnapshotManager snapshotManager = 
getFileStoreTable(tableName).snapshotManager();
+        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        assertThat(snapshot.id()).isEqualTo(3);
+        
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
+
+        plan = table.newReadBuilder().newScan().plan();
+        actual = getResult(table.newReadBuilder().newRead(), plan.splits(), 
ROW_TYPE);
+
+        expected = Arrays.asList("+I[2, 2024-12-31]");
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private FileStoreTable prepareTable() throws Exception {
+        init(warehouse);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.STRING(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        String[] pk = {"k", "v"};
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.singletonList("v"),
+                        new ArrayList<>(Arrays.asList(pk)),
+                        Collections.singletonList("k"),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(BinaryString.fromString("1"), 
BinaryString.fromString("2024-01-01")));
+        writeData(rowData(BinaryString.fromString("2"), 
BinaryString.fromString("2024-12-31")));
+
+        return table;
+    }
+}

Reply via email to