This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 95831a338 [core] Refactor exception handling of PartitionTimeExtractor
(#3806)
95831a338 is described below
commit 95831a338c6563e44ded79d1caf162bd3d28cfaa
Author: yuzelin <[email protected]>
AuthorDate: Sun Jul 28 11:21:56 2024 +0800
[core] Refactor exception handling of PartitionTimeExtractor (#3806)
---
.../paimon/partition/PartitionTimeExtractor.java | 47 +++++-----------------
.../PartitionValuesTimeExpireStrategy.java | 30 +++++++++++++-
.../partition/PartitionTimeExtractorTest.java | 18 +++++++++
.../sink/partition/PartitionMarkDoneTrigger.java | 15 ++++++-
.../partition/PartitionMarkDoneTriggerTest.java | 20 +++++++++
5 files changed, 89 insertions(+), 41 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
index 36ca92382..0016619e2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
@@ -18,11 +18,6 @@
package org.apache.paimon.partition;
-import org.apache.paimon.CoreOptions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import javax.annotation.Nullable;
import java.time.LocalDate;
@@ -39,8 +34,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static java.time.temporal.ChronoField.DAY_OF_MONTH;
import static java.time.temporal.ChronoField.HOUR_OF_DAY;
@@ -52,8 +45,6 @@ import static java.time.temporal.ChronoField.YEAR;
/** Time extractor to extract time from partition values. */
public class PartitionTimeExtractor {
- private static final Logger LOG =
LoggerFactory.getLogger(PartitionTimeExtractor.class);
-
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
new DateTimeFormatterBuilder()
.appendValue(YEAR, 1, 10, SignStyle.NORMAL)
@@ -98,36 +89,18 @@ public class PartitionTimeExtractor {
}
public LocalDateTime extract(List<String> partitionKeys, List<?>
partitionValues) {
- LocalDateTime dateTime = null;
- try {
- String timestampString;
- if (pattern == null) {
- timestampString = partitionValues.get(0).toString();
- } else {
- timestampString = pattern;
- for (int i = 0; i < partitionKeys.size(); i++) {
- timestampString =
- timestampString.replaceAll(
- "\\$" + partitionKeys.get(i),
- partitionValues.get(i).toString());
- }
+ String timestampString;
+ if (pattern == null) {
+ timestampString = partitionValues.get(0).toString();
+ } else {
+ timestampString = pattern;
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ timestampString =
+ timestampString.replaceAll(
+ "\\$" + partitionKeys.get(i),
partitionValues.get(i).toString());
}
- dateTime = toLocalDateTime(timestampString, this.formatter);
- } catch (Exception e) {
- String partitionInfos =
- IntStream.range(0, partitionKeys.size())
- .mapToObj(i -> partitionKeys.get(i) + ":" +
partitionValues.get(i))
- .collect(Collectors.joining(","));
- LOG.warn(
- "Partition {} can't uses '{}' formatter to extract
datetime to expire."
- + " Please check the partition expiration
configuration or"
- + " manually delete the partition using the
drop-partition command or"
- + " use 'update-time' expiration strategy by set
{}, the strategy support non-date formatted partition.",
- partitionInfos,
- this.formatter,
- CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
}
- return dateTime;
+ return toLocalDateTime(timestampString, this.formatter);
}
private static LocalDateTime toLocalDateTime(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
index 5efe8b910..37cbed530 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
@@ -26,9 +26,15 @@ import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.types.RowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.time.LocalDateTime;
+import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* A partition expiration policy that compare the time extracted from the
partition with the current
@@ -36,6 +42,9 @@ import java.util.List;
*/
public class PartitionValuesTimeExpireStrategy extends PartitionExpireStrategy
{
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PartitionValuesTimeExpireStrategy.class);
+
private final PartitionTimeExtractor timeExtractor;
public PartitionValuesTimeExpireStrategy(CoreOptions options, RowType
partitionType) {
@@ -64,8 +73,25 @@ public class PartitionValuesTimeExpireStrategy extends
PartitionExpireStrategy {
@Override
public boolean test(BinaryRow partition) {
Object[] array = convertPartition(partition);
- LocalDateTime partTime = timeExtractor.extract(partitionKeys,
Arrays.asList(array));
- return partTime != null && expireDateTime.isAfter(partTime);
+ try {
+ LocalDateTime partTime = timeExtractor.extract(partitionKeys,
Arrays.asList(array));
+ return expireDateTime.isAfter(partTime);
+ } catch (DateTimeParseException e) {
+ String partitionInfo =
+ IntStream.range(0, partitionKeys.size())
+ .mapToObj(i -> partitionKeys.get(i) + ":" +
array[i])
+ .collect(Collectors.joining(","));
+ LOG.warn(
+ "Can't extract datetime from partition {}. If you want
to configure partition expiration, please:\n"
+ + " 1. Check the expiration configuration.\n"
+ + " 2. Manually delete the partition using
the drop-partition command if the partition"
+ + " value is non-date formatted.\n"
+ + " 3. Use '{}' expiration strategy by set
'{}', which supports non-date formatted partition.",
+ partitionInfo,
+ CoreOptions.PartitionExpireStrategy.UPDATE_TIME,
+ CoreOptions.PARTITION_EXPIRATION_STRATEGY.key());
+ return false;
+ }
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java
index ca9b4869b..3f6cff6ee 100644
---
a/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/partition/PartitionTimeExtractorTest.java
@@ -18,13 +18,17 @@
package org.apache.paimon.partition;
+import org.apache.paimon.testutils.assertj.PaimonAssertions;
+
import org.junit.jupiter.api.Test;
import java.time.LocalDateTime;
+import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.Collections;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link PartitionTimeExtractor}. */
public class PartitionTimeExtractorTest {
@@ -87,4 +91,18 @@ public class PartitionTimeExtractorTest {
Collections.emptyList(),
Collections.singletonList("20230101")))
.isEqualTo(LocalDateTime.parse("2023-01-01T00:00:00"));
}
+
+ @Test
+ public void testExtractNonDateFormattedPartition() {
+ PartitionTimeExtractor extractor = new PartitionTimeExtractor("$ds",
"yyyyMMdd");
+ assertThatThrownBy(
+ () ->
+ extractor.extract(
+ Collections.singletonList("ds"),
+ Collections.singletonList("unknown")))
+ .satisfies(
+ PaimonAssertions.anyCauseMatches(
+ DateTimeParseException.class,
+ "Text 'unknown' could not be parsed at index
0"));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
index ed168387b..a39b5f18a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.partition;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionTimeExtractor;
@@ -31,7 +32,9 @@ import
org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import java.time.Duration;
+import java.time.LocalDateTime;
import java.time.ZoneId;
+import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -123,8 +126,7 @@ public class PartitionMarkDoneTrigger {
long lastUpdateTime = entry.getValue();
long partitionStartTime =
- timeExtractor
- .extract(extractPartitionSpecFromPath(new
Path(partition)))
+ extractDateTime(partition)
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli();
@@ -139,6 +141,15 @@ public class PartitionMarkDoneTrigger {
return needDone;
}
+ @VisibleForTesting
+ LocalDateTime extractDateTime(String partition) {
+ try {
+ return timeExtractor.extract(extractPartitionSpecFromPath(new
Path(partition)));
+ } catch (DateTimeParseException e) {
+ throw new RuntimeException("Can't extract datetime from partition
" + partition, e);
+ }
+ }
+
public void snapshotState() throws Exception {
state.update(new ArrayList<>(pendingPartitions.keySet()));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
index f2f6f47c5..b00906d1c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink.partition;
import org.apache.paimon.partition.PartitionTimeExtractor;
+import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -32,6 +33,7 @@ import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
class PartitionMarkDoneTriggerTest {
@@ -142,6 +144,24 @@ class PartitionMarkDoneTriggerTest {
assertThat(partitions).containsOnly("dt=2024-02-02");
}
+ @Test
+ public void testParseNonDateFormattedPartition() throws Exception {
+ PartitionMarkDoneTrigger trigger =
+ new PartitionMarkDoneTrigger(
+ state,
+ extractor,
+ timeInterval,
+ idleTime,
+ toEpochMillis("2024-02-01"),
+ true);
+
+ assertThatThrownBy(() -> trigger.extractDateTime("unknown"))
+ .satisfies(
+ PaimonAssertions.anyCauseMatches(
+ RuntimeException.class,
+ "Can't extract datetime from partition
unknown"));
+ }
+
private long toEpochMillis(String dt) {
return LocalDateTime.of(LocalDate.parse(dt), LocalTime.MIN)
.atZone(ZoneId.systemDefault())