This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0b3ed9dc01 [core] Unify the order of procedure loading properties
(#4657)
0b3ed9dc01 is described below
commit 0b3ed9dc01594a2b5e8d97a5ce15c7b392edf308
Author: askwang <[email protected]>
AuthorDate: Mon Apr 7 12:09:29 2025 +0800
[core] Unify the order of procedure loading properties (#4657)
---
.../org/apache/paimon/utils/ProcedureUtils.java | 98 +++++++++++++++
.../apache/paimon/operation/PartitionExpire.java | 5 -
.../flink/procedure/ExpirePartitionsProcedure.java | 46 +++----
.../paimon/flink/action/ExpireSnapshotsAction.java | 8 +-
.../flink/action/ExpireSnapshotsActionFactory.java | 5 +-
.../flink/procedure/CompactManifestProcedure.java | 26 ++--
.../flink/procedure/ExpirePartitionsProcedure.java | 52 ++++----
.../flink/procedure/ExpireSnapshotsProcedure.java | 44 +++----
.../UnawareBucketNewFilesCompactionITCase.java | 3 +-
.../procedure/ExpirePartitionsProcedureITCase.java | 86 +++++++++++++
.../procedure/ExpireSnapshotsProcedureITCase.java | 36 +++++-
.../spark/procedure/CompactManifestProcedure.java | 13 +-
.../paimon/spark/procedure/CompactProcedure.java | 10 +-
.../spark/procedure/ExpirePartitionsProcedure.java | 43 ++++---
.../spark/procedure/ExpireSnapshotsProcedure.java | 38 +++---
.../CreateTagFromTimestampProcedureTest.scala | 3 +-
.../procedure/ExpirePartitionsProcedureTest.scala | 140 +++++++++++++++++++++
.../procedure/ExpireSnapshotsProcedureTest.scala | 119 +++++++++++++++++-
.../procedure/RemoveOrphanFilesProcedureTest.scala | 2 +-
.../paimon/spark/sql/AnalyzeTableTestBase.scala | 2 +-
20 files changed, 631 insertions(+), 148 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ProcedureUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ProcedureUtils.java
new file mode 100644
index 0000000000..d13cb3c6df
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ProcedureUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.ExpireConfig;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+
+/** Utils for procedure. */
+public class ProcedureUtils {
+
+ public static Map<String, String> fillInPartitionOptions(
+ String expireStrategy,
+ String timestampFormatter,
+ String timestampPattern,
+ String expirationTime,
+ Integer maxExpires,
+ String options) {
+
+ HashMap<String, String> dynamicOptions = new HashMap<>();
+ putAllOptions(dynamicOptions, options);
+ putIfNotEmpty(
+ dynamicOptions,
CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(), expireStrategy);
+ putIfNotEmpty(
+ dynamicOptions,
+ CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(),
+ timestampFormatter);
+ putIfNotEmpty(
+ dynamicOptions, CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(),
timestampPattern);
+ putIfNotEmpty(dynamicOptions,
CoreOptions.PARTITION_EXPIRATION_TIME.key(), expirationTime);
+ // Set check interval to 0 for dedicated partition expiration.
+ putIfNotEmpty(dynamicOptions,
CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "0");
+ putIfNotEmpty(
+ dynamicOptions,
+ CoreOptions.PARTITION_EXPIRATION_MAX_NUM.key(),
+ maxExpires == null ? null : String.valueOf(maxExpires));
+ return dynamicOptions;
+ }
+
+ public static void putAllOptions(HashMap<String, String> dynamicOptions,
String options) {
+ if (!StringUtils.isNullOrWhitespaceOnly(options)) {
+
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
+ }
+ }
+
+ public static void putIfNotEmpty(
+ HashMap<String, String> dynamicOptions, String key, String value) {
+ if (!StringUtils.isNullOrWhitespaceOnly(value)) {
+ dynamicOptions.put(key, value);
+ }
+ }
+
+ public static ExpireConfig.Builder fillInSnapshotOptions(
+ CoreOptions tableOptions,
+ Integer retainMax,
+ Integer retainMin,
+ String olderThanStr,
+ Integer maxDeletes) {
+
+ ExpireConfig.Builder builder = ExpireConfig.builder();
+ builder.snapshotRetainMax(
+
Optional.ofNullable(retainMax).orElse(tableOptions.snapshotNumRetainMax()))
+ .snapshotRetainMin(
+
Optional.ofNullable(retainMin).orElse(tableOptions.snapshotNumRetainMin()))
+ .snapshotMaxDeletes(
+
Optional.ofNullable(maxDeletes).orElse(tableOptions.snapshotExpireLimit()))
+ .snapshotTimeRetain(tableOptions.snapshotTimeRetain());
+ if (!StringUtils.isNullOrWhitespaceOnly(olderThanStr)) {
+ long olderThanMills =
+ DateTimeUtils.parseTimestampData(olderThanStr, 3,
TimeZone.getDefault())
+ .getMillisecond();
+ builder.snapshotTimeRetain(
+ Duration.ofMillis(System.currentTimeMillis() -
olderThanMills));
+ }
+ return builder;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 082572d5c4..4ac71db7aa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -95,11 +95,6 @@ public class PartitionExpire {
maxExpireNum);
}
- public PartitionExpire withMaxExpireNum(int maxExpireNum) {
- this.maxExpireNum = maxExpireNum;
- return this;
- }
-
public List<Map<String, String>> expire(long commitIdentifier) {
return expire(LocalDateTime.now(), commitIdentifier);
}
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index 2b0e0d1c68..3ad96fd593 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -18,22 +18,19 @@
package org.apache.paimon.flink.procedure;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.TimeUtils;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.ProcedureUtils;
import org.apache.flink.table.procedure.ProcedureContext;
-import java.time.Duration;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
-
/** A procedure to expire partitions. */
public class ExpirePartitionsProcedure extends ProcedureBase {
@@ -59,6 +56,7 @@ public class ExpirePartitionsProcedure extends ProcedureBase {
timestampFormatter,
timestampPattern,
expireStrategy,
+ null,
null);
}
@@ -69,26 +67,28 @@ public class ExpirePartitionsProcedure extends
ProcedureBase {
String timestampFormatter,
String timestampPattern,
String expireStrategy,
- Integer maxExpires)
+ Integer maxExpires,
+ String options)
throws Catalog.TableNotExistException {
- FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
+
+ Map<String, String> dynamicOptions =
+ ProcedureUtils.fillInPartitionOptions(
+ expireStrategy,
+ timestampFormatter,
+ timestampPattern,
+ expirationTime,
+ maxExpires,
+ options);
+
+ Table table = table(tableId).copy(dynamicOptions);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore fileStore = fileStoreTable.store();
- Map<String, String> map = new HashMap<>();
- map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
expireStrategy);
- map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(),
timestampFormatter);
- map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(),
timestampPattern);
- PartitionExpire partitionExpire =
- fileStore.newPartitionExpire(
- "",
- fileStoreTable,
- TimeUtils.parseDuration(expirationTime),
- Duration.ofMillis(0L),
- createPartitionExpireStrategy(
- CoreOptions.fromMap(map),
fileStore.partitionType()));
- if (maxExpires != null) {
- partitionExpire.withMaxExpireNum(maxExpires);
- }
+ PartitionExpire partitionExpire = fileStore.newPartitionExpire("",
fileStoreTable);
+ Preconditions.checkNotNull(
+ partitionExpire,
+ "Both the partition expiration time and partition field can
not be null.");
+
List<Map<String, String>> expired =
partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
? new String[] {"No expired partitions."}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
index 5ae5c486d7..6822f6a7a9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java
@@ -33,6 +33,7 @@ public class ExpireSnapshotsAction extends ActionBase {
private final Integer retainMin;
private final String olderThan;
private final Integer maxDeletes;
+ private final String options;
public ExpireSnapshotsAction(
String database,
@@ -41,7 +42,8 @@ public class ExpireSnapshotsAction extends ActionBase {
Integer retainMax,
Integer retainMin,
String olderThan,
- Integer maxDeletes) {
+ Integer maxDeletes,
+ String options) {
super(catalogConfig);
this.database = database;
this.table = table;
@@ -49,6 +51,7 @@ public class ExpireSnapshotsAction extends ActionBase {
this.retainMin = retainMin;
this.olderThan = olderThan;
this.maxDeletes = maxDeletes;
+ this.options = options;
}
public void run() throws Exception {
@@ -60,6 +63,7 @@ public class ExpireSnapshotsAction extends ActionBase {
retainMax,
retainMin,
olderThan,
- maxDeletes);
+ maxDeletes,
+ options);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java
index f169c9f6f1..91628c2e37 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsActionFactory.java
@@ -31,6 +31,7 @@ public class ExpireSnapshotsActionFactory implements
ActionFactory {
private static final String RETAIN_MIN = "retain_min";
private static final String OLDER_THAN = "older_than";
private static final String MAX_DELETES = "max_deletes";
+ private static final String OPTIONS = "options";
@Override
public String identifier() {
@@ -46,6 +47,7 @@ public class ExpireSnapshotsActionFactory implements
ActionFactory {
String olderThan = params.has(OLDER_THAN) ? params.get(OLDER_THAN) :
null;
Integer maxDeletes =
params.has(MAX_DELETES) ?
Integer.parseInt(params.get(MAX_DELETES)) : null;
+ String options = params.has(OPTIONS) ? params.get(OPTIONS) : null;
ExpireSnapshotsAction action =
new ExpireSnapshotsAction(
@@ -55,7 +57,8 @@ public class ExpireSnapshotsActionFactory implements
ActionFactory {
retainMax,
retainMin,
olderThan,
- maxDeletes);
+ maxDeletes,
+ options);
return Optional.of(action);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
index c6b0a170b0..488aee544e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
@@ -19,15 +19,16 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.utils.ProcedureUtils;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
-import java.util.Collections;
+import java.util.HashMap;
/** Compact manifest file to reduce deleted manifest entries. */
public class CompactManifestProcedure extends ProcedureBase {
@@ -39,14 +40,21 @@ public class CompactManifestProcedure extends ProcedureBase
{
return "compact_manifest";
}
- @ProcedureHint(argument = {@ArgumentHint(name = "table", type =
@DataTypeHint("STRING"))})
- public String[] call(ProcedureContext procedureContext, String tableId)
throws Exception {
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "options", type =
@DataTypeHint("STRING"), isOptional = true)
+ })
+ public String[] call(ProcedureContext procedureContext, String tableId,
String options)
+ throws Exception {
- Table table =
- table(tableId)
- .copy(
- Collections.singletonMap(
- CoreOptions.COMMIT_USER_PREFIX.key(),
COMMIT_USER));
+ FileStoreTable table = (FileStoreTable) table(tableId);
+ HashMap<String, String> dynamicOptions = new HashMap<>();
+ ProcedureUtils.putIfNotEmpty(
+ dynamicOptions, CoreOptions.COMMIT_USER_PREFIX.key(),
COMMIT_USER);
+ ProcedureUtils.putAllOptions(dynamicOptions, options);
+
+ table = table.copy(dynamicOptions);
try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
commit.compactManifests();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index 582b4711fd..8a6528bd02 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -18,12 +18,13 @@
package org.apache.paimon.flink.procedure;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.TimeUtils;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.ProcedureUtils;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
@@ -31,13 +32,9 @@ import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.types.Row;
-import java.time.Duration;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
-
/** A procedure to expire partitions. */
public class ExpirePartitionsProcedure extends ProcedureBase {
@Override
@@ -48,7 +45,10 @@ public class ExpirePartitionsProcedure extends ProcedureBase
{
@ProcedureHint(
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
- @ArgumentHint(name = "expiration_time", type =
@DataTypeHint(value = "STRING")),
+ @ArgumentHint(
+ name = "expiration_time",
+ type = @DataTypeHint(value = "STRING"),
+ isOptional = true),
@ArgumentHint(
name = "timestamp_formatter",
type = @DataTypeHint("STRING"),
@@ -64,7 +64,8 @@ public class ExpirePartitionsProcedure extends ProcedureBase {
@ArgumentHint(
name = "max_expires",
type = @DataTypeHint("INTEGER"),
- isOptional = true)
+ isOptional = true),
+ @ArgumentHint(name = "options", type =
@DataTypeHint("STRING"), isOptional = true)
})
public @DataTypeHint("ROW< expired_partitions STRING>") Row[] call(
ProcedureContext procedureContext,
@@ -73,26 +74,27 @@ public class ExpirePartitionsProcedure extends
ProcedureBase {
String timestampFormatter,
String timestampPattern,
String expireStrategy,
- Integer maxExpires)
+ Integer maxExpires,
+ String options)
throws Catalog.TableNotExistException {
- FileStoreTable fileStoreTable = (FileStoreTable) table(tableId);
+ Map<String, String> dynamicOptions =
+ ProcedureUtils.fillInPartitionOptions(
+ expireStrategy,
+ timestampFormatter,
+ timestampPattern,
+ expirationTime,
+ maxExpires,
+ options);
+ Table table = table(tableId).copy(dynamicOptions);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore fileStore = fileStoreTable.store();
- Map<String, String> map = new HashMap<>();
- map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
expireStrategy);
- map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(),
timestampFormatter);
- map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(),
timestampPattern);
- PartitionExpire partitionExpire =
- fileStore.newPartitionExpire(
- "",
- fileStoreTable,
- TimeUtils.parseDuration(expirationTime),
- Duration.ofMillis(0L),
- createPartitionExpireStrategy(
- CoreOptions.fromMap(map),
fileStore.partitionType()));
- if (maxExpires != null) {
- partitionExpire.withMaxExpireNum(maxExpires);
- }
+ PartitionExpire partitionExpire = fileStore.newPartitionExpire("",
fileStoreTable);
+
+ Preconditions.checkNotNull(
+ partitionExpire,
+ "Both the partition expiration time and partition field can
not be null.");
+
List<Map<String, String>> expired =
partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
? new Row[] {Row.of("No expired partitions.")}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
index 9b662fc369..b7c481828d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java
@@ -18,18 +18,20 @@
package org.apache.paimon.flink.procedure;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.table.ExpireSnapshots;
-import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.ProcedureUtils;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
-import java.time.Duration;
-import java.util.TimeZone;
+import java.util.HashMap;
/** A procedure to expire snapshots. */
public class ExpireSnapshotsProcedure extends ProcedureBase {
@@ -57,7 +59,8 @@ public class ExpireSnapshotsProcedure extends ProcedureBase {
@ArgumentHint(
name = "max_deletes",
type = @DataTypeHint("INTEGER"),
- isOptional = true)
+ isOptional = true),
+ @ArgumentHint(name = "options", type =
@DataTypeHint("STRING"), isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
@@ -65,27 +68,20 @@ public class ExpireSnapshotsProcedure extends ProcedureBase
{
Integer retainMax,
Integer retainMin,
String olderThanStr,
- Integer maxDeletes)
+ Integer maxDeletes,
+ String options)
throws Catalog.TableNotExistException {
- ExpireSnapshots expireSnapshots = table(tableId).newExpireSnapshots();
- ExpireConfig.Builder builder = ExpireConfig.builder();
- if (retainMax != null) {
- builder.snapshotRetainMax(retainMax);
- }
- if (retainMin != null) {
- builder.snapshotRetainMin(retainMin);
- }
- if (olderThanStr != null) {
- builder.snapshotTimeRetain(
- Duration.ofMillis(
- System.currentTimeMillis()
- - DateTimeUtils.parseTimestampData(
- olderThanStr, 3,
TimeZone.getDefault())
- .getMillisecond()));
- }
- if (maxDeletes != null) {
- builder.snapshotMaxDeletes(maxDeletes);
- }
+ Table table = table(tableId);
+ HashMap<String, String> dynamicOptions = new HashMap<>();
+ ProcedureUtils.putAllOptions(dynamicOptions, options);
+
+ table = table.copy(dynamicOptions);
+ ExpireSnapshots expireSnapshots = table.newExpireSnapshots();
+
+ CoreOptions tableOptions = ((FileStoreTable) table).store().options();
+ ExpireConfig.Builder builder =
+ ProcedureUtils.fillInSnapshotOptions(
+ tableOptions, retainMax, retainMin, olderThanStr,
maxDeletes);
return new String[] {expireSnapshots.config(builder.build()).expire()
+ ""};
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java
index 90444287d4..45d6197f85 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java
@@ -118,7 +118,8 @@ public class UnawareBucketNewFilesCompactionITCase extends
AbstractTestBase {
assertThat(actual.keySet()).hasSameElementsAs(values);
assertThat(actual.values()).allMatch(i -> i == 3);
- tEnv.executeSql("CALL sys.expire_snapshots(`table` => 'default.T',
retain_max => 1)")
+ tEnv.executeSql(
+ "CALL sys.expire_snapshots(`table` => 'default.T',
retain_max => 1, retain_min => 1)")
.await();
assertThat(fileIO.listStatus(new Path(warehouse,
"default.db/T/pt=0/bucket-0"))).hasSize(1);
assertThat(fileIO.listStatus(new Path(warehouse,
"default.db/T/pt=1/bucket-0"))).hasSize(1);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
index da091722fb..2e5eff6ced 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java
@@ -31,6 +31,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT Case for {@link ExpirePartitionsProcedure}. */
public class ExpirePartitionsProcedureITCase extends CatalogITCaseBase {
@@ -452,6 +453,91 @@ public class ExpirePartitionsProcedureITCase extends
CatalogITCaseBase {
.containsExactlyInAnyOrder("c:2024-06-03",
"Never-expire:9999-09-09");
}
+ @Test
+ public void testExpirePartitionsLoadTablePropsFirst() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt) WITH ("
+ + " 'bucket' = '1', "
+ + " 'write-only' = 'true', "
+ + " 'partition.timestamp-formatter' = 'yyyy-MM-dd', "
+ + " 'partition.expiration-max-num'='2'"
+ + ")");
+ FileStoreTable table = paimonTable("T");
+
+ sql("INSERT INTO T VALUES ('a', '2024-06-01')");
+ sql("INSERT INTO T VALUES ('b', '2024-06-02')");
+ sql("INSERT INTO T VALUES ('c', '2024-06-03')");
+ // This partition never expires.
+ sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
+ Function<InternalRow, String> consumerReadResult =
+ (InternalRow row) -> row.getString(0) + ":" + row.getString(1);
+
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder(
+ "a:2024-06-01", "b:2024-06-02", "c:2024-06-03",
"Never-expire:9999-09-09");
+
+ // no 'partition.expiration-time' value in table property or procedure
parameter.
+ assertThatThrownBy(() -> sql("CALL sys.expire_partitions(`table` =>
'default.T')"))
+ .rootCause()
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining(
+ "Both the partition expiration time and partition
field can not be null.");
+
+ // 'partition.timestamp-formatter' value using table property.
+ // 'partition.expiration-time' value using procedure parameter.
+ assertThat(
+ callExpirePartitions(
+ "CALL sys.expire_partitions("
+ + "`table` => 'default.T'"
+ + ", expiration_time => '1 d')"))
+ .containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");
+
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder("c:2024-06-03",
"Never-expire:9999-09-09");
+ }
+
+ @Test
+ public void testExpirePartitionsUseOptionsParam() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " k STRING,"
+ + " dt STRING,"
+ + " PRIMARY KEY (k, dt) NOT ENFORCED"
+ + ") PARTITIONED BY (dt) WITH ("
+ + " 'bucket' = '1'"
+ + ")");
+ FileStoreTable table = paimonTable("T");
+
+ sql("INSERT INTO T VALUES ('a', '2024-06-01')");
+ sql("INSERT INTO T VALUES ('b', '2024-06-02')");
+ sql("INSERT INTO T VALUES ('c', '2024-06-03')");
+ // This partition never expires.
+ sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')");
+ Function<InternalRow, String> consumerReadResult =
+ (InternalRow row) -> row.getString(0) + ":" + row.getString(1);
+
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder(
+ "a:2024-06-01", "b:2024-06-02", "c:2024-06-03",
"Never-expire:9999-09-09");
+
+ // set conf in options.
+ assertThat(
+ callExpirePartitions(
+ "CALL sys.expire_partitions("
+ + "`table` => 'default.T'"
+ + ", options =>
'partition.expiration-time = 1d,"
+ + " partition.expiration-max-num = 2, "
+ + " partition.timestamp-formatter =
yyyy-MM-dd')"))
+ .containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02");
+
+ assertThat(read(table, consumerReadResult))
+ .containsExactlyInAnyOrder("c:2024-06-03",
"Never-expire:9999-09-09");
+ }
+
/** Return a list of expired partitions. */
public List<String> callExpirePartitions(String callSql) {
return sql(callSql).stream()
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
index 77b7409f41..4c530130a2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java
@@ -40,7 +40,8 @@ public class ExpireSnapshotsProcedureITCase extends
CatalogITCaseBase {
public void testExpireSnapshotsProcedure() throws Exception {
sql(
"CREATE TABLE word_count ( word STRING PRIMARY KEY NOT
ENFORCED, cnt INT)"
- + " WITH ( 'num-sorted-run.compaction-trigger' =
'9999' )");
+ + " WITH ( 'num-sorted-run.compaction-trigger' =
'9999',"
+ + "'write-only' = 'true', 'snapshot.num-retained.min'
= '1')");
FileStoreTable table = paimonTable("word_count");
SnapshotManager snapshotManager = table.snapshotManager();
@@ -81,7 +82,9 @@ public class ExpireSnapshotsProcedureITCase extends
CatalogITCaseBase {
public void testExpireSnapshotsAction() throws Exception {
sql(
"CREATE TABLE word_count ( word STRING PRIMARY KEY NOT
ENFORCED, cnt INT)"
- + " WITH ( 'num-sorted-run.compaction-trigger' =
'9999' )");
+ + " WITH ( 'num-sorted-run.compaction-trigger' =
'9999',"
+ + "'write-only' = 'true', 'snapshot.num-retained.min'
= '1')");
+
FileStoreTable table = paimonTable("word_count");
StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder().streamingMode().build();
@@ -162,6 +165,35 @@ public class ExpireSnapshotsProcedureITCase extends
CatalogITCaseBase {
checkSnapshots(snapshotManager, 6, 6);
}
+ @Test
+ public void testLoadTablePropsFirstAndOptions() throws Exception {
+ sql(
+ "CREATE TABLE word_count ( word STRING PRIMARY KEY NOT
ENFORCED, cnt INT)"
+ + " WITH ( 'num-sorted-run.compaction-trigger' =
'9999',"
+ + "'write-only' = 'true', 'snapshot.num-retained.min'
= '1', 'snapshot.num-retained.max' = '5')");
+ FileStoreTable table = paimonTable("word_count");
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ // initially prepare 6 snapshots, expected snapshots (1, 2, 3, 4, 5, 6)
+ for (int i = 0; i < 6; ++i) {
+ sql("INSERT INTO word_count VALUES ('" + String.valueOf(i) + "', "
+ i + ")");
+ }
+ checkSnapshots(snapshotManager, 1, 6);
+
+ // snapshot.num-retained.max is 5, expected snapshots (2, 3, 4, 5, 6)
+ sql("CALL sys.expire_snapshots(`table` => 'default.word_count')");
+ checkSnapshots(snapshotManager, 2, 6);
+
+ // older_than => timestamp of snapshot 6, snapshot.expire.limit => 1,
expected snapshots (3,
+ // 4, 5, 6)
+ Timestamp ts6 = new
Timestamp(snapshotManager.latestSnapshot().timeMillis());
+ sql(
+ "CALL sys.expire_snapshots(`table` => 'default.word_count',
older_than => '"
+ + ts6.toString()
+ + "', options => 'snapshot.expire.limit=1')");
+ checkSnapshots(snapshotManager, 3, 6);
+ }
+
protected void checkSnapshots(SnapshotManager sm, int earliest, int
latest) throws IOException {
assertThat(sm.snapshotCount()).isEqualTo(latest - earliest + 1);
assertThat(sm.earliestSnapshotId()).isEqualTo(earliest);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
index 12a3a286f4..5a6837f6c1 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.procedure;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.utils.ProcedureUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -29,6 +30,8 @@ import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
+import java.util.HashMap;
+
import static org.apache.spark.sql.types.DataTypes.StringType;
/**
@@ -41,7 +44,10 @@ import static
org.apache.spark.sql.types.DataTypes.StringType;
public class CompactManifestProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {ProcedureParameter.required("table",
StringType)};
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.optional("options", StringType)
+ };
private static final StructType OUTPUT_TYPE =
new StructType(
@@ -67,7 +73,12 @@ public class CompactManifestProcedure extends BaseProcedure {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ String options = args.isNullAt(1) ? null : args.getString(1);
+
Table table = loadSparkTable(tableIdent).getTable();
+ HashMap<String, String> dynamicOptions = new HashMap<>();
+ ProcedureUtils.putAllOptions(dynamicOptions, options);
+ table = table.copy(dynamicOptions);
try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
commit.compactManifests();
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 689e2cabd3..50cc5ad986 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -48,6 +48,7 @@ import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParameterUtils;
+import org.apache.paimon.utils.ProcedureUtils;
import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.TimeUtils;
@@ -197,11 +198,10 @@ public class CompactProcedure extends BaseProcedure {
table.partitionKeys());
}
- Map<String, String> dynamicOptions = new HashMap<>();
- dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
- if (!StringUtils.isNullOrWhitespaceOnly(options)) {
-
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
- }
+ HashMap<String, String> dynamicOptions = new HashMap<>();
+ ProcedureUtils.putIfNotEmpty(
+ dynamicOptions, CoreOptions.WRITE_ONLY.key(),
"false");
+ ProcedureUtils.putAllOptions(dynamicOptions, options);
table = table.copy(dynamicOptions);
InternalRow internalRow =
newInternalRow(
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
index dd9388b67a..036b3b3c8b 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -18,11 +18,11 @@
package org.apache.paimon.spark.procedure;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.TimeUtils;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.ProcedureUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -32,12 +32,9 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
-import java.time.Duration;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.paimon.partition.PartitionExpireStrategy.createPartitionExpireStrategy;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -47,11 +44,12 @@ public class ExpirePartitionsProcedure extends
BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
- ProcedureParameter.required("expiration_time", StringType),
+ ProcedureParameter.optional("expiration_time", StringType),
ProcedureParameter.optional("timestamp_formatter", StringType),
ProcedureParameter.optional("timestamp_pattern", StringType),
ProcedureParameter.optional("expire_strategy", StringType),
- ProcedureParameter.optional("max_expires", IntegerType)
+ ProcedureParameter.optional("max_expires", IntegerType),
+ ProcedureParameter.optional("options", StringType)
};
private static final StructType OUTPUT_TYPE =
@@ -77,32 +75,33 @@ public class ExpirePartitionsProcedure extends
BaseProcedure {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- String expirationTime = args.getString(1);
+ String expirationTime = args.isNullAt(1) ? null : args.getString(1);
String timestampFormatter = args.isNullAt(2) ? null :
args.getString(2);
String timestampPattern = args.isNullAt(3) ? null : args.getString(3);
String expireStrategy = args.isNullAt(4) ? null : args.getString(4);
Integer maxExpires = args.isNullAt(5) ? null : args.getInt(5);
+ String options = args.isNullAt(6) ? null : args.getString(6);
+
return modifyPaimonTable(
tableIdent,
table -> {
+ Map<String, String> dynamicOptions =
+ ProcedureUtils.fillInPartitionOptions(
+ expireStrategy,
+ timestampFormatter,
+ timestampPattern,
+ expirationTime,
+ maxExpires,
+ options);
+ table = table.copy(dynamicOptions);
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore fileStore = fileStoreTable.store();
- Map<String, String> map = new HashMap<>();
- map.put(CoreOptions.PARTITION_EXPIRATION_STRATEGY.key(),
expireStrategy);
- map.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(),
timestampFormatter);
- map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(),
timestampPattern);
PartitionExpire partitionExpire =
- fileStore.newPartitionExpire(
- "",
- fileStoreTable,
- TimeUtils.parseDuration(expirationTime),
- Duration.ofMillis(0L),
- createPartitionExpireStrategy(
- CoreOptions.fromMap(map),
fileStore.partitionType()));
- if (maxExpires != null) {
- partitionExpire.withMaxExpireNum(maxExpires);
- }
+ fileStore.newPartitionExpire("", fileStoreTable);
+ Preconditions.checkNotNull(
+ partitionExpire,
+ "Both the partition expiration time and partition
field can not be null.");
List<Map<String, String>> expired =
partitionExpire.expire(Long.MAX_VALUE);
return expired == null || expired.isEmpty()
? new InternalRow[] {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
index f24f18067a..c54a3dbdaf 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java
@@ -18,10 +18,11 @@
package org.apache.paimon.spark.procedure;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.table.ExpireSnapshots;
-import org.apache.paimon.utils.DateTimeUtils;
-import org.apache.paimon.utils.StringUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.ProcedureUtils;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -30,8 +31,7 @@ import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
-import java.time.Duration;
-import java.util.TimeZone;
+import java.util.HashMap;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -45,7 +45,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
ProcedureParameter.optional("retain_max", IntegerType),
ProcedureParameter.optional("retain_min", IntegerType),
ProcedureParameter.optional("older_than", StringType),
- ProcedureParameter.optional("max_deletes", IntegerType)
+ ProcedureParameter.optional("max_deletes", IntegerType),
+ ProcedureParameter.optional("options", StringType)
};
private static final StructType OUTPUT_TYPE =
@@ -76,29 +77,20 @@ public class ExpireSnapshotsProcedure extends BaseProcedure
{
Integer retainMin = args.isNullAt(2) ? null : args.getInt(2);
String olderThanStr = args.isNullAt(3) ? null : args.getString(3);
Integer maxDeletes = args.isNullAt(4) ? null : args.getInt(4);
+ String options = args.isNullAt(5) ? null : args.getString(5);
return modifyPaimonTable(
tableIdent,
table -> {
+ HashMap<String, String> dynamicOptions = new HashMap<>();
+ ProcedureUtils.putAllOptions(dynamicOptions, options);
+ table = table.copy(dynamicOptions);
ExpireSnapshots expireSnapshots =
table.newExpireSnapshots();
- ExpireConfig.Builder builder = ExpireConfig.builder();
- if (retainMax != null) {
- builder.snapshotRetainMax(retainMax);
- }
- if (retainMin != null) {
- builder.snapshotRetainMin(retainMin);
- }
- if (!StringUtils.isNullOrWhitespaceOnly(olderThanStr)) {
- long olderThanMills =
- DateTimeUtils.parseTimestampData(
- olderThanStr, 3,
TimeZone.getDefault())
- .getMillisecond();
- builder.snapshotTimeRetain(
- Duration.ofMillis(System.currentTimeMillis() -
olderThanMills));
- }
- if (maxDeletes != null) {
- builder.snapshotMaxDeletes(maxDeletes);
- }
+
+ CoreOptions tableOptions = ((FileStoreTable)
table).store().options();
+ ExpireConfig.Builder builder =
+ ProcedureUtils.fillInSnapshotOptions(
+ tableOptions, retainMax, retainMin,
olderThanStr, maxDeletes);
int deleted =
expireSnapshots.config(builder.build()).expire();
return new InternalRow[] {newInternalRow(deleted)};
});
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
index 301b769288..e9b00298e4 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala
@@ -148,7 +148,8 @@ class CreateTagFromTimestampProcedureTest extends
PaimonSparkTestBase with Strea
// make snapshot 1 expire.
checkAnswer(
- spark.sql("CALL paimon.sys.expire_snapshots(table => 'test.T',
retain_max => 1)"),
+ spark.sql(
+ "CALL paimon.sys.expire_snapshots(table => 'test.T',
retain_max => 1, retain_min => 1)"),
Row(1) :: Nil)
// create tag from timestamp that earlier than the expired
snapshot 1.
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
index 9f0d23dc93..be7dc26241 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamTest
+import org.assertj.core.api.Assertions.assertThatThrownBy
/** IT Case for [[ExpirePartitionsProcedure]]. */
class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with
StreamTest {
@@ -616,4 +617,143 @@ class ExpirePartitionsProcedureTest extends
PaimonSparkTestBase with StreamTest
}
}
}
+
+ test("Paimon Procedure: expire partitions load table property first") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ spark.sql(s"""
+ |CREATE TABLE T (k STRING, pt STRING)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'k,pt',
+ | 'bucket' = '1',
+ | 'write-only' = 'true',
+ | 'partition.timestamp-formatter' = 'yyyy-MM-dd',
+ | 'partition.expiration-max-num'='2')
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+ val location = loadTable("T").location().toString
+
+ val inputData = MemoryStream[(String, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("k", "pt")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T")
+
+ try {
+ // snapshot-1
+ inputData.addData(("a", "2024-06-01"))
+ stream.processAllAvailable()
+
+ // snapshot-2
+ inputData.addData(("b", "2024-06-02"))
+ stream.processAllAvailable()
+
+ // snapshot-3
+ inputData.addData(("c", "2024-06-03"))
+ stream.processAllAvailable()
+
+ // This partition never expires.
+ inputData.addData(("Never-expire", "9999-09-09"))
+ stream.processAllAvailable()
+
+ checkAnswer(
+ query(),
+ Row("a", "2024-06-01") :: Row("b", "2024-06-02") :: Row("c",
"2024-06-03") :: Row(
+ "Never-expire",
+ "9999-09-09") :: Nil)
+
+ // 'partition.timestamp-formatter' value using table property.
+ // 'partition.expiration-time' value using procedure parameter.
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.expire_partitions(table => 'test.T',
expiration_time => '1 d')"),
+ Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil
+ )
+
+ checkAnswer(query(), Row("c", "2024-06-03") :: Row("Never-expire",
"9999-09-09") :: Nil)
+
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+
+ test("Paimon Procedure: expire partitions add options parameter") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ spark.sql(s"""
+ |CREATE TABLE T (k STRING, pt STRING)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'k,pt',
+ | 'bucket' = '1')
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+ val location = loadTable("T").location().toString
+
+ val inputData = MemoryStream[(String, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("k", "pt")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T")
+
+ try {
+ // snapshot-1
+ inputData.addData(("a", "2024-06-01"))
+ stream.processAllAvailable()
+
+ // snapshot-2
+ inputData.addData(("b", "2024-06-02"))
+ stream.processAllAvailable()
+
+ // snapshot-3
+ inputData.addData(("c", "2024-06-03"))
+ stream.processAllAvailable()
+
+ // This partition never expires.
+ inputData.addData(("Never-expire", "9999-09-09"))
+ stream.processAllAvailable()
+
+ checkAnswer(
+ query(),
+ Row("a", "2024-06-01") :: Row("b", "2024-06-02") :: Row("c",
"2024-06-03") :: Row(
+ "Never-expire",
+ "9999-09-09") :: Nil)
+
+ // set conf in options.
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.expire_partitions(table => 'test.T', " +
+ "options => 'partition.expiration-time = 1d," +
+ " partition.expiration-max-num = 2," +
+ " partition.timestamp-formatter = yyyy-MM-dd')"),
+ Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil
+ )
+
+ checkAnswer(query(), Row("c", "2024-06-03") :: Row("Never-expire",
"9999-09-09") :: Nil)
+
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
index da7be42310..b39aa5d058 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala
@@ -39,7 +39,8 @@ class ExpireSnapshotsProcedureTest extends
PaimonSparkTestBase with StreamTest {
// define a change-log table and test `forEachBatch` api
spark.sql(s"""
|CREATE TABLE T (a INT, b STRING)
- |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
+ |TBLPROPERTIES ('primary-key'='a', 'bucket'='3',
+ |'write-only' = 'true', 'snapshot.num-retained.min' =
'1')
|""".stripMargin)
val location = loadTable("T").location().toString
@@ -144,7 +145,8 @@ class ExpireSnapshotsProcedureTest extends
PaimonSparkTestBase with StreamTest {
test("Paimon Procedure: test parameter order_than with string type") {
sql(
"CREATE TABLE T (a INT, b STRING) " +
- "TBLPROPERTIES ( 'num-sorted-run.compaction-trigger' = '999' )")
+ "TBLPROPERTIES ( 'num-sorted-run.compaction-trigger' = '999'," +
+ "'write-only' = 'true', 'snapshot.num-retained.min' = '1')")
val table = loadTable("T")
val snapshotManager = table.snapshotManager
@@ -160,6 +162,119 @@ class ExpireSnapshotsProcedureTest extends
PaimonSparkTestBase with StreamTest {
checkSnapshots(snapshotManager, 3, 5)
}
+ test("Paimon Procedure: expire snapshots load table property first") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a', 'bucket'='3',
+ |'snapshot.num-retained.max' = '2',
+ |'snapshot.num-retained.min' = '1',
+ |'write-only' = 'true')
+ |""".stripMargin)
+ val location = loadTable("T").location().toString
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ // snapshot-1
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Nil)
+
+ // snapshot-2
+ inputData.addData((2, "b"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ // snapshot-3
+ inputData.addData((2, "b2"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+
+ // expire
+ checkAnswer(
+ spark.sql("CALL paimon.sys.expire_snapshots(table => 'test.T')"),
+ Row(1) :: Nil)
+
+ checkAnswer(
+ spark.sql("SELECT snapshot_id FROM paimon.test.`T$snapshots`"),
+ Row(2L) :: Row(3L) :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+
+ test("Paimon Procedure: expire snapshots add options parameter") {
+ failAfter(streamingTimeout) {
+ withTempDir {
+ checkpointDir =>
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ('primary-key'='a', 'bucket'='3',
'write-only' = 'true')
+ |""".stripMargin)
+ val location = loadTable("T").location().toString
+
+ val inputData = MemoryStream[(Int, String)]
+ val stream = inputData
+ .toDS()
+ .toDF("a", "b")
+ .writeStream
+ .option("checkpointLocation", checkpointDir.getCanonicalPath)
+ .foreachBatch {
+ (batch: Dataset[Row], _: Long) =>
+ batch.write.format("paimon").mode("append").save(location)
+ }
+ .start()
+
+ val query = () => spark.sql("SELECT * FROM T ORDER BY a")
+
+ try {
+ // snapshot-1
+ inputData.addData((1, "a"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Nil)
+
+ // snapshot-2
+ inputData.addData((2, "b"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ // snapshot-3
+ inputData.addData((2, "b2"))
+ stream.processAllAvailable()
+ checkAnswer(query(), Row(1, "a") :: Row(2, "b2") :: Nil)
+
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.expire_snapshots(table => 'test.T', options
=> 'snapshot.num-retained.max=2, snapshot.num-retained.min=1')"),
+ Row(1L) :: Nil)
+
+ checkAnswer(
+ spark.sql("SELECT snapshot_id FROM paimon.test.`T$snapshots`"),
+ Row(2L) :: Row(3L) :: Nil)
+ } finally {
+ stream.stop()
+ }
+ }
+ }
+ }
+
def checkSnapshots(sm: SnapshotManager, earliest: Int, latest: Int): Unit = {
assertThat(sm.snapshotCount).isEqualTo(latest - earliest + 1)
assertThat(sm.earliestSnapshotId).isEqualTo(earliest)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
index a205e69fd7..af82549738 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
@@ -278,7 +278,7 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
fileIO.writeFile(orphanFile2, "b", true)
checkAnswer(
- spark.sql("CALL paimon.sys.expire_snapshots(table => 'T', retain_max =>
1)"),
+ spark.sql("CALL paimon.sys.expire_snapshots(table => 'T', retain_max =>
1, retain_min => 1)"),
Row(2) :: Nil)
val older_than1 = new java.sql.Timestamp(
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
index 4f8ccae22d..8170c58364 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
@@ -379,7 +379,7 @@ abstract class AnalyzeTableTestBase extends
PaimonSparkTestBase {
Assertions.assertEquals(2, statsFileCount(tableLocation, fileIO))
// test expire statistic
- spark.sql("CALL sys.expire_snapshots(table => 'test.T', retain_max => 1)")
+ spark.sql("CALL sys.expire_snapshots(table => 'test.T', retain_max => 1,
retain_min => 1)")
Assertions.assertEquals(1, statsFileCount(tableLocation, fileIO))
val orphanStats = new Path(tableLocation, "statistics/stats-orphan-0")