This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 95831a338 [core] Refactor exception handling of PartitionTimeExtractor 
(#3806)
95831a338 is described below

commit 95831a338c6563e44ded79d1caf162bd3d28cfaa
Author: yuzelin <[email protected]>
AuthorDate: Sun Jul 28 11:21:56 2024 +0800

    [core] Refactor exception handling of PartitionTimeExtractor (#3806)
---
 .../paimon/partition/PartitionTimeExtractor.java   | 47 +++++-----------------
 .../PartitionValuesTimeExpireStrategy.java         | 30 +++++++++++++-
 .../partition/PartitionTimeExtractorTest.java      | 18 +++++++++
 .../sink/partition/PartitionMarkDoneTrigger.java   | 15 ++++++-
 .../partition/PartitionMarkDoneTriggerTest.java    | 20 +++++++++
 5 files changed, 89 insertions(+), 41 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
index 36ca92382..0016619e2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
@@ -18,11 +18,6 @@
 
 package org.apache.paimon.partition;
 
-import org.apache.paimon.CoreOptions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import javax.annotation.Nullable;
 
 import java.time.LocalDate;
@@ -39,8 +34,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static java.time.temporal.ChronoField.DAY_OF_MONTH;
 import static java.time.temporal.ChronoField.HOUR_OF_DAY;
@@ -52,8 +45,6 @@ import static java.time.temporal.ChronoField.YEAR;
 /** Time extractor to extract time from partition values. */
 public class PartitionTimeExtractor {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(PartitionTimeExtractor.class);
-
     private static final DateTimeFormatter TIMESTAMP_FORMATTER =
             new DateTimeFormatterBuilder()
                     .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
@@ -98,36 +89,18 @@ public class PartitionTimeExtractor {
     }
 
     public LocalDateTime extract(List<String> partitionKeys, List<?> 
partitionValues) {
-        LocalDateTime dateTime = null;
-        try {
-            String timestampString;
-            if (pattern == null) {
-                timestampString = partitionValues.get(0).toString();
-            } else {
-                timestampString = pattern;
-                for (int i = 0; i < partitionKeys.size(); i++) {
-                    timestampString =
-                            timestampString.replaceAll(
-                                    "\\$" + partitionKeys.get(i),
-                                    partitionValues.get(i).toString());
-                }
+        String timestampString;
+        if (pattern == null) {
+            timestampString = partitionValues.get(0).toString();
+        } else {
+            timestampString = pattern;
+            for (int i = 0; i < partitionKeys.size(); i++) {
+                timestampString =
+                        timestampString.replaceAll(
+                                "\\$" + partitionKeys.get(i), 
partitionValues.get(i).toString());
             }
-            dateTime = toLocalDateTime(timestampString, this.formatter);
-        } catch (Exception e) {
-            String partitionInfos =
-                    IntStream.range(0, partitionKeys.size())
-                            .mapToObj(i -> partitionKeys.get(i) + ":" + 
partitionValues.get(i))
-                            .collect(Collectors.joining(","));
-            LOG.warn(
-                    "Partition {} can't uses '{}' formatter to extract 
datetime to expire."
-                            + " Please check the partition expiration 
configuration or"
-                            + " manually delete the partition using the 
drop-partition command or"
-                            + " use 'update-time' expiration strategy by set 
{}, the strategy support non-date formatted partition.",
-                    partitionInfos,
-                    this.formatter,
-                    CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
         }
-        return dateTime;
+        return toLocalDateTime(timestampString, this.formatter);
     }
 
     private static LocalDateTime toLocalDateTime(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
index 5efe8b910..37cbed530 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
@@ -26,9 +26,15 @@ import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.types.RowType;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.time.LocalDateTime;
+import java.time.format.DateTimeParseException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /**
  * A partition expiration policy that compare the time extracted from the 
partition with the current
@@ -36,6 +42,9 @@ import java.util.List;
  */
 public class PartitionValuesTimeExpireStrategy extends PartitionExpireStrategy 
{
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(PartitionValuesTimeExpireStrategy.class);
+
     private final PartitionTimeExtractor timeExtractor;
 
     public PartitionValuesTimeExpireStrategy(CoreOptions options, RowType 
partitionType) {
@@ -64,8 +73,25 @@ public class PartitionValuesTimeExpireStrategy extends 
PartitionExpireStrategy {
         @Override
         public boolean test(BinaryRow partition) {
             Object[] array = convertPartition(partition);
-            LocalDateTime partTime = timeExtractor.extract(partitionKeys, 
Arrays.asList(array));
-            return partTime != null && expireDateTime.isAfter(partTime);
+            try {
+                LocalDateTime partTime = timeExtractor.extract(partitionKeys, 
Arrays.asList(array));
+                return expireDateTime.isAfter(partTime);
+            } catch (DateTimeParseException e) {
+                String partitionInfo =
+                        IntStream.range(0, partitionKeys.size())
+                                .mapToObj(i -> partitionKeys.get(i) + ":" + 
array[i])
+                                .collect(Collectors.joining(","));
+                LOG.warn(
+                        "Can't extract datetime from partition {}. If you want 
to configure partition expiration, please:\n"
+                                + "  1. Check the expiration configuration.\n"
+                                + "  2. Manually delete the partition using 
the drop-partition command if the partition"
+                                + " value is non-date formatted.\n"
+                                + "  3. Use '{}' expiration strategy by set 
'{}', which supports non-date formatted partition.",
+                        partitionInfo,
+                        CoreOptions.PartitionExpireStrategy.UPDATE_TIME,
+                        CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
+                return false;
+            }
         }
 
         @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java
index ca9b4869b..3f6cff6ee 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java
@@ -18,13 +18,17 @@
 
 package org.apache.paimon.partition;
 
+import org.apache.paimon.testutils.assertj.PaimonAssertions;
+
 import org.junit.jupiter.api.Test;
 
 import java.time.LocalDateTime;
+import java.time.format.DateTimeParseException;
 import java.util.Arrays;
 import java.util.Collections;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link PartitionTimeExtractor}. */
 public class PartitionTimeExtractorTest {
@@ -87,4 +91,18 @@ public class PartitionTimeExtractorTest {
                                 Collections.emptyList(), 
Collections.singletonList("20230101")))
                 .isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00"));
     }
+
+    @Test
+    public void testExtractNonDateFormattedPartition() {
+        PartitionTimeExtractor extractor = new PartitionTimeExtractor("$ds", 
"yyyyMMdd");
+        assertThatThrownBy(
+                        () ->
+                                extractor.extract(
+                                        Collections.singletonList("ds"),
+                                        Collections.singletonList("unknown")))
+                .satisfies(
+                        PaimonAssertions.anyCauseMatches(
+                                DateTimeParseException.class,
+                                "Text 'unknown' could not be parsed at index 
0"));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
index ed168387b..a39b5f18a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink.partition;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionTimeExtractor;
@@ -31,7 +32,9 @@ import 
org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 
 import java.time.Duration;
+import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.time.format.DateTimeParseException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -123,8 +126,7 @@ public class PartitionMarkDoneTrigger {
 
             long lastUpdateTime = entry.getValue();
             long partitionStartTime =
-                    timeExtractor
-                            .extract(extractPartitionSpecFromPath(new 
Path(partition)))
+                    extractDateTime(partition)
                             .atZone(ZoneId.systemDefault())
                             .toInstant()
                             .toEpochMilli();
@@ -139,6 +141,15 @@ public class PartitionMarkDoneTrigger {
         return needDone;
     }
 
+    @VisibleForTesting
+    LocalDateTime extractDateTime(String partition) {
+        try {
+            return timeExtractor.extract(extractPartitionSpecFromPath(new 
Path(partition)));
+        } catch (DateTimeParseException e) {
+            throw new RuntimeException("Can't extract datetime from partition 
" + partition, e);
+        }
+    }
+
     public void snapshotState() throws Exception {
         state.update(new ArrayList<>(pendingPartitions.keySet()));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
index f2f6f47c5..b00906d1c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.sink.partition;
 
 import org.apache.paimon.partition.PartitionTimeExtractor;
+import org.apache.paimon.testutils.assertj.PaimonAssertions;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -32,6 +33,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 class PartitionMarkDoneTriggerTest {
 
@@ -142,6 +144,24 @@ class PartitionMarkDoneTriggerTest {
         assertThat(partitions).containsOnly("dt=2024-02-02");
     }
 
+    @Test
+    public void testParseNonDateFormattedPartition() throws Exception {
+        PartitionMarkDoneTrigger trigger =
+                new PartitionMarkDoneTrigger(
+                        state,
+                        extractor,
+                        timeInterval,
+                        idleTime,
+                        toEpochMillis("2024-02-01"),
+                        true);
+
+        assertThatThrownBy(() -> trigger.extractDateTime("unknown"))
+                .satisfies(
+                        PaimonAssertions.anyCauseMatches(
+                                RuntimeException.class,
+                                "Can't extract datetime from partition 
unknown"));
+    }
+
     private long toEpochMillis(String dt) {
         return LocalDateTime.of(LocalDate.parse(dt), LocalTime.MIN)
                 .atZone(ZoneId.systemDefault())

Reply via email to