This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5088d64e7 [flink] Support specifying time-pattern in ExpairePartition
(#3909)
5088d64e7 is described below
commit 5088d64e7a261137aeab0316aa42169be12b48a9
Author: herefree <[email protected]>
AuthorDate: Tue Aug 6 16:41:30 2024 +0800
[flink] Support specifying time-pattern in ExpairePartition (#3909)
---
docs/content/flink/procedures.md | 6 +-
.../ProcedurePositionalArgumentsITCase.java | 2 +-
.../apache/paimon/flink/action/ActionFactory.java | 1 +
.../flink/action/ExpirePartitionsAction.java | 2 +
.../action/ExpirePartitionsActionFactory.java | 5 +-
.../flink/procedure/ExpirePartitionsProcedure.java | 6 ++
.../flink/action/ExpirePartitionsActionITCase.java | 64 ++++++++++++++++++----
.../procedure/ExpirePartitionsProcedureITCase.java | 50 +++++++++++++++++
8 files changed, 122 insertions(+), 14 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 2ef8f800f..3fae9b405 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -258,13 +258,15 @@ All available procedures are listed below.
<li>table: the target table identifier. Cannot be empty.</li>
<li>expiration_time: the expiration interval of a partition. A
partition will be expired if it‘s lifetime is over this value. Partition time
is extracted from the partition value.</li>
<li>timestamp_formatter: the formatter to format timestamp from
string.</li>
+ <li>timestamp_pattern: the pattern to get a timestamp from
partitions.</li>
<li>expire_strategy: specifies the expiration strategy for
partition expiration, possible values: 'values-time' or 'update-time' ,
'values-time' as default.</li>
</td>
<td>
-- for Flink 1.18<br/><br/>
- CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd',
'values-time')<br/><br/>
+ CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd', '$dt',
'values-time')<br/><br/>
-- for Flink 1.19 and later<br/><br/>
- CALL sys.expire_partitions(`table` => 'default.T', expiration_time =>
'1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy =>
'values-time')<br/><br/>
+ CALL sys.expire_partitions(`table` => 'default.T', expiration_time =>
'1 d', timestamp_formatter => 'yyyy-MM-dd', expire_strategy =>
'values-time')<br/>
+ CALL sys.expire_partitions(`table` => 'default.T', expiration_time =>
'1 d', timestamp_formatter => 'yyyy-MM-dd HH:mm', timestamp_pattern => '$dt
$hm', expire_strategy => 'values-time')<br/><br/>
</td>
</tr>
<tr>
diff --git
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index 5e1acca3a..1db04aa1d 100644
---
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -74,7 +74,7 @@ public class ProcedurePositionalArgumentsITCase extends
CatalogITCaseBase {
sql("INSERT INTO T VALUES ('1', '2024-06-01')");
sql("INSERT INTO T VALUES ('2', '9024-06-01')");
assertThat(read(table)).containsExactlyInAnyOrder("1:2024-06-01",
"2:9024-06-01");
- sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd',
'values-time')");
+ sql("CALL sys.expire_partitions('default.T', '1 d', 'yyyy-MM-dd',
'$dt', 'values-time')");
assertThat(read(table)).containsExactlyInAnyOrder("2:9024-06-01");
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
index bcc53aa92..aeacc8ce6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionFactory.java
@@ -56,6 +56,7 @@ public interface ActionFactory extends Factory {
String EXPIRATIONTIME = "expiration_time";
String TIMESTAMPFORMATTER = "timestamp_formatter";
String EXPIRE_STRATEGY = "expire_strategy";
+ String TIMESTAMP_PATTERN = "timestamp_pattern";
Optional<Action> create(MultipleParameterToolAdapter params);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index 5a41ba0f9..9528bc137 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -43,6 +43,7 @@ public class ExpirePartitionsAction extends TableActionBase {
Map<String, String> catalogConfig,
String expirationTime,
String timestampFormatter,
+ String timestampPattern,
String expireStrategy) {
super(warehouse, databaseName, tableName, catalogConfig);
if (!(table instanceof FileStoreTable)) {
@@ -54,6 +55,7 @@ public class ExpirePartitionsAction extends TableActionBase {
Map<String, String> map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(),
timestampFormatter);
+ map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(),
timestampPattern);
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore<?> fileStore = fileStoreTable.store();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
index c343d3b7f..3d0dfc265 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsActionFactory.java
@@ -42,6 +42,7 @@ public class ExpirePartitionsActionFactory implements
ActionFactory {
String expirationTime = params.get(EXPIRATIONTIME);
String timestampFormatter = params.get(TIMESTAMPFORMATTER);
String expireStrategy = params.get(EXPIRE_STRATEGY);
+ String timestampPattern = params.get(TIMESTAMP_PATTERN);
Map<String, String> catalogConfig = optionalConfigMap(params,
CATALOG_CONF);
@@ -53,6 +54,7 @@ public class ExpirePartitionsActionFactory implements
ActionFactory {
catalogConfig,
expirationTime,
timestampFormatter,
+ timestampPattern,
expireStrategy));
}
@@ -64,7 +66,8 @@ public class ExpirePartitionsActionFactory implements
ActionFactory {
System.out.println("Syntax:");
System.out.println(
" expire_partitions --warehouse <warehouse_path> --database
<database_name> "
- + "--table <table_name> --tag_name <tag_name>
--expiration_time <expiration_time> --timestamp_formatter
<timestamp_formatter>");
+ + "--table <table_name> --tag_name <tag_name>
--expiration_time <expiration_time> --timestamp_formatter <timestamp_formatter>"
+ + "[--timestamp_pattern <timestamp_pattern>]
[--expire_strategy <expire_strategy>]");
System.out.println();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index ae9eb139e..abbf4f486 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -55,6 +55,10 @@ public class ExpirePartitionsProcedure extends ProcedureBase
{
name = "timestamp_formatter",
type = @DataTypeHint("STRING"),
isOptional = true),
+ @ArgumentHint(
+ name = "timestamp_pattern",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
@ArgumentHint(
name = "expire_strategy",
type = @DataTypeHint("STRING"),
@@ -65,6 +69,7 @@ public class ExpirePartitionsProcedure extends ProcedureBase {
String tableId,
String expirationTime,
String timestampFormatter,
+ String timestampPattern,
String expireStrategy)
throws Catalog.TableNotExistException {
FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
@@ -72,6 +77,7 @@ public class ExpirePartitionsProcedure extends ProcedureBase {
Map<String, String> map = new HashMap<>();
map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
expireStrategy);
map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(),
timestampFormatter);
+ map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(),
timestampPattern);
PartitionExpire partitionExpire =
new PartitionExpire(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
index 55a36637e..9077204c0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpirePartitionsActionITCase.java
@@ -43,9 +43,9 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ExpirePartitionsActionITCase extends ActionITCaseBase {
private static final DataType[] FIELD_TYPES =
- new DataType[] {DataTypes.STRING(), DataTypes.STRING()};
+ new DataType[] {DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING()};
- private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new
String[] {"k", "v"});
+ private static final RowType ROW_TYPE = RowType.of(FIELD_TYPES, new
String[] {"k", "dt", "hm"});
@BeforeEach
public void setUp() {
@@ -58,7 +58,7 @@ public class ExpirePartitionsActionITCase extends
ActionITCaseBase {
TableScan.Plan plan = table.newReadBuilder().newScan().plan();
List<String> actual = getResult(table.newReadBuilder().newRead(),
plan.splits(), ROW_TYPE);
List<String> expected;
- expected = Arrays.asList("+I[1, 2024-01-01]", "+I[2, 2024-12-31]");
+ expected = Arrays.asList("+I[1, 2024-01-01, 01:00]", "+I[2,
9999-09-20, 02:00]");
assertThat(actual).isEqualTo(expected);
@@ -84,7 +84,42 @@ public class ExpirePartitionsActionITCase extends
ActionITCaseBase {
plan = table.newReadBuilder().newScan().plan();
actual = getResult(table.newReadBuilder().newRead(), plan.splits(),
ROW_TYPE);
- expected = Arrays.asList("+I[2, 2024-12-31]");
+ expected = Arrays.asList("+I[2, 9999-09-20, 02:00]");
+
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ @Test
+ public void testExpirePartitionsActionWithTimePartition() throws Exception
{
+ FileStoreTable table = prepareTable();
+ TableScan.Plan plan = table.newReadBuilder().newScan().plan();
+ List<String> actual = getResult(table.newReadBuilder().newRead(),
plan.splits(), ROW_TYPE);
+ List<String> expected;
+ expected = Arrays.asList("+I[1, 2024-01-01, 01:00]", "+I[2,
9999-09-20, 02:00]");
+
+ assertThat(actual).isEqualTo(expected);
+
+ createAction(
+ ExpirePartitionsAction.class,
+ "expire_partitions",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--expiration_time",
+ "1 d",
+ "--timestamp_formatter",
+ "yyyy-MM-dd HH:mm",
+ "--timestamp_pattern",
+ "$dt $hm")
+ .run();
+
+ plan = table.newReadBuilder().newScan().plan();
+ actual = getResult(table.newReadBuilder().newRead(), plan.splits(),
ROW_TYPE);
+
+ expected = Arrays.asList("+I[2, 9999-09-20, 02:00]");
assertThat(actual).isEqualTo(expected);
}
@@ -94,13 +129,14 @@ public class ExpirePartitionsActionITCase extends
ActionITCaseBase {
RowType rowType =
RowType.of(
- new DataType[] {DataTypes.STRING(),
DataTypes.STRING()},
- new String[] {"k", "v"});
- String[] pk = {"k", "v"};
+ new DataType[] {DataTypes.STRING(),
DataTypes.STRING(), DataTypes.STRING()},
+ new String[] {"k", "dt", "hm"});
+ String[] pk = {"k", "dt", "hm"};
+ String[] partitions = {"dt", "hm"};
FileStoreTable table =
createFileStoreTable(
rowType,
- Collections.singletonList("v"),
+ new ArrayList<>(Arrays.asList(partitions)),
new ArrayList<>(Arrays.asList(pk)),
Collections.singletonList("k"),
Collections.emptyMap());
@@ -110,8 +146,16 @@ public class ExpirePartitionsActionITCase extends
ActionITCaseBase {
commit = writeBuilder.newCommit();
// 3 snapshots
- writeData(rowData(BinaryString.fromString("1"),
BinaryString.fromString("2024-01-01")));
- writeData(rowData(BinaryString.fromString("2"),
BinaryString.fromString("2024-12-31")));
+ writeData(
+ rowData(
+ BinaryString.fromString("1"),
+ BinaryString.fromString("2024-01-01"),
+ BinaryString.fromString("01:00")));
+ writeData(
+ rowData(
+ BinaryString.fromString("2"),
+ BinaryString.fromString("9999-09-20"),
+ BinaryString.fromString("02:00")));
return table;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
index d476096f0..71a6dc466 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
@@ -297,6 +297,56 @@ public class ExpirePartitionsProcedureITCase extends
CatalogITCaseBase {
assertThat(read(table, consumerReadResult)).isEmpty();
}
+ @Test
+ public void testPartitionExpireWithTimePartition() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " hm STRING,"
+ + " PRIMARY KEY (k, dt, hm) NOT ENFORCED"
+ + ") PARTITIONED BY (dt, hm) WITH ("
+ + " 'bucket' = '1'"
+ + ")");
+ FileStoreTable table = paimonTable("T");
+ // Test there are no expired partitions.
+ assertThat(
+ callExpirePartitions(
+ "CALL sys.expire_partitions("
+ + "`table` => 'default.T'"
+ + ", expiration_time => '1 d'"
+ + ", timestamp_pattern => '$dt $hm'"
+ + ", timestamp_formatter =>
'yyyy-MM-dd HH:mm')"))
+ .containsExactlyInAnyOrder("No expired partitions.");
+
+ sql("INSERT INTO T VALUES ('1', '2024-06-01', '01:00')");
+ sql("INSERT INTO T VALUES ('2', '2024-06-02', '02:00')");
+ // This partition never expires.
+ sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09', '99:99')");
+
+ Function<InternalRow, String> consumerReadResult =
+ (InternalRow row) ->
+ row.getString(0) + ":" + row.getString(1) + ":" +
row.getString(2);
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder(
+ "1:2024-06-01:01:00",
+ "2:2024-06-02:02:00",
+ "Never-expire:9999-09-09:99:99");
+
+ // Show a list of expired partitions.
+ assertThat(
+ callExpirePartitions(
+ "CALL sys.expire_partitions("
+ + "`table` => 'default.T'"
+ + ", expiration_time => '1 d'"
+ + ", timestamp_pattern => '$dt $hm'"
+ + ", timestamp_formatter =>
'yyyy-MM-dd HH:mm')"))
+ .containsExactlyInAnyOrder("dt=2024-06-01, hm=01:00",
"dt=2024-06-02, hm=02:00");
+
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder("Never-expire:9999-09-09:99:99");
+ }
+
/** Return a list of expired partitions. */
public List<String> callExpirePartitions(String callSql) {
return sql(callSql).stream()