This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4185189fa [core] check metastore only for done_partition markdown 
action (#3456)
4185189fa is described below

commit 4185189fa62d817f604be1b58d5968bb30b35ae6
Author: wangwj <[email protected]>
AuthorDate: Mon Jun 3 15:38:07 2024 +0800

    [core] check metastore only for done_partition markdown action (#3456)
---
 .../paimon/partition/PartitionTimeExtractor.java      |  4 +++-
 .../flink/sink/partition/PartitionMarkDone.java       | 19 ++++++++++++-------
 .../sink/partition/PartitionMarkDoneTrigger.java      | 12 ++++++------
 .../sink/partition/PartitionMarkDoneTriggerTest.java  |  6 +++---
 4 files changed, 24 insertions(+), 17 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
index 968fff8ad..af1a4ef9b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
@@ -117,7 +117,9 @@ public class PartitionTimeExtractor {
                             .mapToObj(i -> partitionKeys.get(i) + ":" + 
partitionValues.get(i))
                             .collect(Collectors.joining(","));
             LOG.warn(
-                    "Parition {} can't be extract datetime to expire,Please 
check the partition expiration configuration or manually delete the partition 
using the drop-partition command. ",
+                    "Partition {} can't be extract datetime to expire."
+                            + " Please check the partition expiration 
configuration or"
+                            + " manually delete the partition using the 
drop-partition command.",
                     paritionInfos);
         }
         return dateTime;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index e39c9c807..c712682ca 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -93,12 +93,17 @@ public class PartitionMarkDone implements Closeable {
 
         MetastoreClient.Factory metastoreClientFactory =
                 table.catalogEnvironment().metastoreClientFactory();
-        checkNotNull(
-                metastoreClientFactory, "Cannot mark done partition for table 
without metastore.");
-        checkArgument(
-                coreOptions.partitionedTableInMetastore(),
-                "Table should enable %s",
-                METASTORE_PARTITIONED_TABLE.key());
+
+        String partitionMarkDownAction = 
options.get(PARTITION_MARK_DONE_ACTION);
+        if (partitionMarkDownAction.contains("done-partition")) {
+            checkNotNull(
+                    metastoreClientFactory,
+                    "Cannot mark done partition for table without metastore.");
+            checkArgument(
+                    coreOptions.partitionedTableInMetastore(),
+                    "Table should enable %s",
+                    METASTORE_PARTITIONED_TABLE.key());
+        }
 
         InternalRowPartitionComputer partitionComputer =
                 new InternalRowPartitionComputer(
@@ -116,7 +121,7 @@ public class PartitionMarkDone implements Closeable {
                         idleToDone);
 
         List<PartitionMarkDoneAction> actions =
-                
Arrays.asList(options.get(PARTITION_MARK_DONE_ACTION).split(",")).stream()
+                Arrays.asList(partitionMarkDownAction.split(",")).stream()
                         .map(
                                 action -> {
                                     switch (action) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
index 946d2b209..9309938e2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
@@ -37,29 +37,29 @@ public class PartitionMarkDoneTrigger {
 
     private final State state;
     private final PartitionTimeExtractor timeExtractor;
-    private final long timeInternal;
+    private final long timeInterval;
     private final long idleTime;
     private final Map<String, Long> pendingPartitions;
 
     public PartitionMarkDoneTrigger(
             State state,
             PartitionTimeExtractor timeExtractor,
-            Duration timeInternal,
+            Duration timeInterval,
             Duration idleTime)
             throws Exception {
-        this(state, timeExtractor, timeInternal, idleTime, 
System.currentTimeMillis());
+        this(state, timeExtractor, timeInterval, idleTime, 
System.currentTimeMillis());
     }
 
     PartitionMarkDoneTrigger(
             State state,
             PartitionTimeExtractor timeExtractor,
-            Duration timeInternal,
+            Duration timeInterval,
             Duration idleTime,
             long currentTimeMillis)
             throws Exception {
         this.state = state;
         this.timeExtractor = timeExtractor;
-        this.timeInternal = timeInternal.toMillis();
+        this.timeInterval = timeInterval.toMillis();
         this.idleTime = idleTime.toMillis();
         this.pendingPartitions = new HashMap<>();
         state.restore().forEach(p -> pendingPartitions.put(p, 
currentTimeMillis));
@@ -93,7 +93,7 @@ public class PartitionMarkDoneTrigger {
                             .atZone(ZoneId.systemDefault())
                             .toInstant()
                             .toEpochMilli();
-            long partitionEndTime = partitionStartTime + timeInternal;
+            long partitionEndTime = partitionStartTime + timeInterval;
             lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime);
 
             if (currentTimeMillis - lastUpdateTime > idleTime) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
index b3c636308..fd66214e3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
@@ -52,11 +52,11 @@ class PartitionMarkDoneTriggerTest {
                 };
 
         PartitionTimeExtractor extractor = new PartitionTimeExtractor("$dt", 
"yyyy-MM-dd");
-        Duration timeInternal = Duration.ofDays(1);
+        Duration timeInterval = Duration.ofDays(1);
         Duration idleTime = Duration.ofMinutes(15);
         PartitionMarkDoneTrigger trigger =
                 new PartitionMarkDoneTrigger(
-                        state, extractor, timeInternal, idleTime, 
toEpochMillis("2024-02-01"));
+                        state, extractor, timeInterval, idleTime, 
toEpochMillis("2024-02-01"));
 
         // test not reach partition end + idle time
         trigger.notifyPartition("dt=2024-02-02", toEpochMillis("2024-02-01"));
@@ -91,7 +91,7 @@ class PartitionMarkDoneTriggerTest {
         pendingPartitions.add("dt=2024-02-04");
         trigger =
                 new PartitionMarkDoneTrigger(
-                        state, extractor, timeInternal, idleTime, 
toEpochMillis("2024-02-06"));
+                        state, extractor, timeInterval, idleTime, 
toEpochMillis("2024-02-06"));
         partitions = trigger.donePartitions(toEpochMillis("2024-02-06"));
         assertThat(partitions).isEmpty();
         partitions = trigger.donePartitions(toEpochMillis("2024-02-06") + 
idleTime.toMillis() + 1);

Reply via email to