This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5088d64e7 [flink] Support specifying time-pattern in ExpairePartition 
(#3909)
5088d64e7 is described below

commit 5088d64e7a261137aeab0316aa42169be12b48a9
Author: herefree <[email protected]>
AuthorDate: Tue Aug 6 16:41:30 2024 +0800

    [flink] Support specifying time-pattern in ExpairePartition (#3909)
---
 docs/content/flink/procedures.md                   |  6 +-
 .../ProcedurePositionalArgumentsITCase.java        |  2 +-
 .../apache/paimon/flink/action/ActionFactory.java  |  1 +
 .../flink/action/ExpirePartitionsAction.java       |  2 +
 .../action/ExpirePartitionsActionFactory.java      |  5 +-
 .../flink/procedure/ExpirePartitionsProcedure.java |  6 ++
 .../flink/action/ExpirePartitionsActionITCase.java | 64 ++++++++++++++++++----
 .../procedure/ExpirePartitionsProcedureITCase.java | 50 +++++++++++++++++
 8 files changed, 122 insertions(+), 14 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 2ef8f800f..3fae9b405 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -258,13 +258,15 @@ All available procedures are listed below.
             <li>table: the target table identifier. Cannot be empty.</li>
             <li>expiration_time: the expiration interval of a partition. A 
partition will be expired if it‘s lifetime is over this value. Partition time 
is extracted from the partition value.</li>
             <li>timestamp_formatter: the formatter to format timestamp from 
string.</li>
+            <li>timestamp_pattern: the pattern to get a timestamp from 
partitions.</li>
             <li>expire_strategy: specifies the expiration strategy for 
partition expiration, possible values: 'values-time' or 'update-time' , 
'values-time' as default.</li>
       </td>
       <td>
          -- for Flink 1.18<br/><br/>
-         CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', 
'values-time')<br/><br/>
+         CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt', 
'values-time')<br/><br/>
          -- for Flink 1.19 and later<br/><br/>
-         CALL sys.expire_partitions(`table` => 'default.T', expiration_time => 
'1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 
'values-time')<br/><br/>
+         CALL sys.expire_partitions(`table` => 'default.T', expiration_time => 
'1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy => 
'values-time')<br/>
+         CALL sys.expire_partitions(`table` => 'default.T', expiration_time => 
'1 d', timestamp_formatter => 'yyyy-MM-dd HH:mm', timestamp_pattern => '$dt 
$hm', expire_strategy => 'values-time')<br/><br/>
       </td>
    </tr>
     <tr>
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index 5e1acca3a..1db04aa1d 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -74,7 +74,7 @@ public class ProcedurePositionalArgumentsITCase extends 
CatalogITCaseBase {
         sql("INSERT INTO T VALUES ('1', '2024-06-01')");
         sql("INSERT INTO T VALUES ('2', '9024-06-01')");
         assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01", 
"2:9024-06-01");
-        sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', 
'values-time')");
+        sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', 
'$dt', 'values-time')");
         assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01");
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
index bcc53aa92..aeacc8ce6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
@@ -56,6 +56,7 @@ public interface ActionFactory extends Factory {
     String EXPIRATIONTIME = "expiration_time";
     String TIMESTAMPFORMATTER = "timestamp_formatter";
     String EXPIRE_STRATEGY = "expire_strategy";
+    String TIMESTAMP_PATTERN = "timestamp_pattern";
 
     Optional<Action> create(MultipleParameterToolAdapter params);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index 5a41ba0f9..9528bc137 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -43,6 +43,7 @@ public class ExpirePartitionsAction extends TableActionBase {
             Map<String, String> catalogConfig,
             String expirationTime,
             String timestampFormatter,
+            String timestampPattern,
             String expireStrategy) {
         super(warehouse, databaseName, tableName, catalogConfig);
         if (!(table instanceof FileStoreTable)) {
@@ -54,6 +55,7 @@ public class ExpirePartitionsAction extends TableActionBase {
         Map<String, String> map = new HashMap<>();
         map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), 
expireStrategy);
         map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), 
timestampFormatter);
+        map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), 
timestampPattern);
 
         FileStoreTable fileStoreTable = (FileStoreTable) table;
         FileStore<?> fileStore = fileStoreTable.store();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
index c343d3b7f..3d0dfc265 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
@@ -42,6 +42,7 @@ public class ExpirePartitionsActionFactory implements 
ActionFactory {
         String expirationTime = params.get(EXPIRATIONTIME);
         String timestampFormatter = params.get(TIMESTAMPFORMATTER);
         String expireStrategy = params.get(EXPIRE_STRATEGY);
+        String timestampPattern = params.get(TIMESTAMP_PATTERN);
 
         Map<String, String> catalogConfig = optionalConfigMap(params, 
CATALOG_CONF);
 
@@ -53,6 +54,7 @@ public class ExpirePartitionsActionFactory implements 
ActionFactory {
                         catalogConfig,
                         expirationTime,
                         timestampFormatter,
+                        timestampPattern,
                         expireStrategy));
     }
 
@@ -64,7 +66,8 @@ public class ExpirePartitionsActionFactory implements 
ActionFactory {
         System.out.println("Syntax:");
         System.out.println(
                 "  expire_partitions --warehouse <warehouse_path> --database 
<database_name> "
-                        + "--table <table_name> --tag_name <tag_name> 
--expiration_time <expiration_time> --timestamp_formatter 
<timestamp_formatter>");
+                        + "--table <table_name> --tag_name <tag_name> 
--expiration_time <expiration_time> --timestamp_formatter <timestamp_formatter>"
+                        + "[--timestamp_pattern <timestamp_pattern>] 
[--expire_strategy <expire_strategy>]");
         System.out.println();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index ae9eb139e..abbf4f486 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -55,6 +55,10 @@ public class ExpirePartitionsProcedure extends ProcedureBase 
{
                         name = "timestamp_formatter",
                         type = @DataTypeHint("STRING"),
                         isOptional = true),
+                @ArgumentHint(
+                        name = "timestamp_pattern",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
                 @ArgumentHint(
                         name = "expire_strategy",
                         type = @DataTypeHint("STRING"),
@@ -65,6 +69,7 @@ public class ExpirePartitionsProcedure extends ProcedureBase {
             String tableId,
             String expirationTime,
             String timestampFormatter,
+            String timestampPattern,
             String expireStrategy)
             throws Catalog.TableNotExistException {
         FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
@@ -72,6 +77,7 @@ public class ExpirePartitionsProcedure extends ProcedureBase {
         Map<String, String> map = new HashMap<>();
         map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), 
expireStrategy);
         map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), 
timestampFormatter);
+        map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), 
timestampPattern);
 
         PartitionExpire partitionExpire =
                 new PartitionExpire(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
index 55a36637e..9077204c0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
@@ -43,9 +43,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class ExpirePartitionsActionITCase extends ActionITCaseBase {
 
     private static final DataType[] FIELD_TYPES =
-            new DataType[] {DataTypes.STRING(), DataTypes.STRING()};
+            new DataType[] {DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()};
 
-    private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new 
String[] {"k", "v"});
+    private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new 
String[] {"k", "dt", "hm"});
 
     @BeforeEach
     public void setUp() {
@@ -58,7 +58,7 @@ public class ExpirePartitionsActionITCase extends 
ActionITCaseBase {
         TableScan.Plan plan = table.newReadBuilder().newScan().plan();
         List<String> actual = getResult(table.newReadBuilder().newRead(), 
plan.splits(), ROW_TYPE);
         List<String> expected;
-        expected = Arrays.asList("+I[1, 2024-01-01]", "+I[2, 2024-12-31]");
+        expected = Arrays.asList("+I[1, 2024-01-01, 01:00]", "+I[2, 
9999-09-20, 02:00]");
 
         assertThat(actual).isEqualTo(expected);
 
@@ -84,7 +84,42 @@ public class ExpirePartitionsActionITCase extends 
ActionITCaseBase {
         plan = table.newReadBuilder().newScan().plan();
         actual = getResult(table.newReadBuilder().newRead(), plan.splits(), 
ROW_TYPE);
 
-        expected = Arrays.asList("+I[2, 2024-12-31]");
+        expected = Arrays.asList("+I[2, 9999-09-20, 02:00]");
+
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    @Test
+    public void testExpirePartitionsActionWithTimePartition() throws Exception 
{
+        FileStoreTable table = prepareTable();
+        TableScan.Plan plan = table.newReadBuilder().newScan().plan();
+        List<String> actual = getResult(table.newReadBuilder().newRead(), 
plan.splits(), ROW_TYPE);
+        List<String> expected;
+        expected = Arrays.asList("+I[1, 2024-01-01, 01:00]", "+I[2, 
9999-09-20, 02:00]");
+
+        assertThat(actual).isEqualTo(expected);
+
+        createAction(
+                        ExpirePartitionsAction.class,
+                        "expire_partitions",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        tableName,
+                        "--expiration_time",
+                        "1 d",
+                        "--timestamp_formatter",
+                        "yyyy-MM-dd HH:mm",
+                        "--timestamp_pattern",
+                        "$dt $hm")
+                .run();
+
+        plan = table.newReadBuilder().newScan().plan();
+        actual = getResult(table.newReadBuilder().newRead(), plan.splits(), 
ROW_TYPE);
+
+        expected = Arrays.asList("+I[2, 9999-09-20, 02:00]");
 
         assertThat(actual).isEqualTo(expected);
     }
@@ -94,13 +129,14 @@ public class ExpirePartitionsActionITCase extends 
ActionITCaseBase {
 
         RowType rowType =
                 RowType.of(
-                        new DataType[] {DataTypes.STRING(), 
DataTypes.STRING()},
-                        new String[] {"k", "v"});
-        String[] pk = {"k", "v"};
+                        new DataType[] {DataTypes.STRING(), 
DataTypes.STRING(), DataTypes.STRING()},
+                        new String[] {"k", "dt", "hm"});
+        String[] pk = {"k", "dt", "hm"};
+        String[] partitions = {"dt", "hm"};
         FileStoreTable table =
                 createFileStoreTable(
                         rowType,
-                        Collections.singletonList("v"),
+                        new ArrayList<>(Arrays.asList(partitions)),
                         new ArrayList<>(Arrays.asList(pk)),
                         Collections.singletonList("k"),
                         Collections.emptyMap());
@@ -110,8 +146,16 @@ public class ExpirePartitionsActionITCase extends 
ActionITCaseBase {
         commit = writeBuilder.newCommit();
 
         // 3 snapshots
-        writeData(rowData(BinaryString.fromString("1"), 
BinaryString.fromString("2024-01-01")));
-        writeData(rowData(BinaryString.fromString("2"), 
BinaryString.fromString("2024-12-31")));
+        writeData(
+                rowData(
+                        BinaryString.fromString("1"),
+                        BinaryString.fromString("2024-01-01"),
+                        BinaryString.fromString("01:00")));
+        writeData(
+                rowData(
+                        BinaryString.fromString("2"),
+                        BinaryString.fromString("9999-09-20"),
+                        BinaryString.fromString("02:00")));
 
         return table;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
index d476096f0..71a6dc466 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
@@ -297,6 +297,56 @@ public class ExpirePartitionsProcedureITCase extends 
CatalogITCaseBase {
         assertThat(read(table, consumerReadResult)).isEmpty();
     }
 
+    @Test
+    public void testPartitionExpireWithTimePartition() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " k STRING,"
+                        + " dt STRING,"
+                        + " hm STRING,"
+                        + " PRIMARY KEY (k, dt, hm) NOT ENFORCED"
+                        + ") PARTITIONED BY (dt, hm) WITH ("
+                        + " 'bucket' = '1'"
+                        + ")");
+        FileStoreTable table = paimonTable("T");
+        // Test there are no expired partitions.
+        assertThat(
+                        callExpirePartitions(
+                                "CALL sys.expire_partitions("
+                                        + "`table` => 'default.T'"
+                                        + ", expiration_time => '1 d'"
+                                        + ", timestamp_pattern => '$dt $hm'"
+                                        + ", timestamp_formatter => 
'yyyy-MM-dd HH:mm')"))
+                .containsExactlyInAnyOrder("No expired partitions.");
+
+        sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')");
+        sql("INSERT INTO T VALUES ('2', '2024-06-02', '02:00')");
+        // This partition never expires.
+        sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09', '99:99')");
+
+        Function<InternalRow, String> consumerReadResult =
+                (InternalRow row) ->
+                        row.getString(0) + ":" + row.getString(1) + ":" + 
row.getString(2);
+        assertThat(read(table, consumerReadResult))
+                .containsExactlyInAnyOrder(
+                        "1:2024-06-01:01:00",
+                        "2:2024-06-02:02:00",
+                        "Never-expire:9999-09-09:99:99");
+
+        // Show a list of expired partitions.
+        assertThat(
+                        callExpirePartitions(
+                                "CALL sys.expire_partitions("
+                                        + "`table` => 'default.T'"
+                                        + ", expiration_time => '1 d'"
+                                        + ", timestamp_pattern => '$dt $hm'"
+                                        + ", timestamp_formatter => 
'yyyy-MM-dd HH:mm')"))
+                .containsExactlyInAnyOrder("dt=2024-06-01, hm=01:00", 
"dt=2024-06-02, hm=02:00");
+
+        assertThat(read(table, consumerReadResult))
+                .containsExactlyInAnyOrder("Never-expire:9999-09-09:99:99");
+    }
+
     /** Return a list of expired partitions. */
     public List<String> callExpirePartitions(String callSql) {
         return sql(callSql).stream()

Reply via email to