This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4185189fa [core] check metastore only for done_partition markdown
action (#3456)
4185189fa is described below
commit 4185189fa62d817f604be1b58d5968bb30b35ae6
Author: wangwj <[email protected]>
AuthorDate: Mon Jun 3 15:38:07 2024 +0800
[core] check metastore only for done_partition markdown action (#3456)
---
.../paimon/partition/PartitionTimeExtractor.java | 4 +++-
.../flink/sink/partition/PartitionMarkDone.java | 19 ++++++++++++-------
.../sink/partition/PartitionMarkDoneTrigger.java | 12 ++++++------
.../sink/partition/PartitionMarkDoneTriggerTest.java | 6 +++---
4 files changed, 24 insertions(+), 17 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
index 968fff8ad..af1a4ef9b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionTimeExtractor.java
@@ -117,7 +117,9 @@ public class PartitionTimeExtractor {
.mapToObj(i -> partitionKeys.get(i) + ":" +
partitionValues.get(i))
.collect(Collectors.joining(","));
LOG.warn(
- "Parition {} can't be extract datetime to expire,Please
check the partition expiration configuration or manually delete the partition
using the drop-partition command. ",
+ "Partition {} can't be extract datetime to expire."
+ + " Please check the partition expiration
configuration or"
+ + " manually delete the partition using the
drop-partition command.",
paritionInfos);
}
return dateTime;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index e39c9c807..c712682ca 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -93,12 +93,17 @@ public class PartitionMarkDone implements Closeable {
MetastoreClient.Factory metastoreClientFactory =
table.catalogEnvironment().metastoreClientFactory();
- checkNotNull(
- metastoreClientFactory, "Cannot mark done partition for table
without metastore.");
- checkArgument(
- coreOptions.partitionedTableInMetastore(),
- "Table should enable %s",
- METASTORE_PARTITIONED_TABLE.key());
+
+ String partitionMarkDownAction =
options.get(PARTITION_MARK_DONE_ACTION);
+ if (partitionMarkDownAction.contains("done-partition")) {
+ checkNotNull(
+ metastoreClientFactory,
+ "Cannot mark done partition for table without metastore.");
+ checkArgument(
+ coreOptions.partitionedTableInMetastore(),
+ "Table should enable %s",
+ METASTORE_PARTITIONED_TABLE.key());
+ }
InternalRowPartitionComputer partitionComputer =
new InternalRowPartitionComputer(
@@ -116,7 +121,7 @@ public class PartitionMarkDone implements Closeable {
idleToDone);
List<PartitionMarkDoneAction> actions =
-
Arrays.asList(options.get(PARTITION_MARK_DONE_ACTION).split(",")).stream()
+ Arrays.asList(partitionMarkDownAction.split(",")).stream()
.map(
action -> {
switch (action) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
index 946d2b209..9309938e2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTrigger.java
@@ -37,29 +37,29 @@ public class PartitionMarkDoneTrigger {
private final State state;
private final PartitionTimeExtractor timeExtractor;
- private final long timeInternal;
+ private final long timeInterval;
private final long idleTime;
private final Map<String, Long> pendingPartitions;
public PartitionMarkDoneTrigger(
State state,
PartitionTimeExtractor timeExtractor,
- Duration timeInternal,
+ Duration timeInterval,
Duration idleTime)
throws Exception {
- this(state, timeExtractor, timeInternal, idleTime,
System.currentTimeMillis());
+ this(state, timeExtractor, timeInterval, idleTime,
System.currentTimeMillis());
}
PartitionMarkDoneTrigger(
State state,
PartitionTimeExtractor timeExtractor,
- Duration timeInternal,
+ Duration timeInterval,
Duration idleTime,
long currentTimeMillis)
throws Exception {
this.state = state;
this.timeExtractor = timeExtractor;
- this.timeInternal = timeInternal.toMillis();
+ this.timeInterval = timeInterval.toMillis();
this.idleTime = idleTime.toMillis();
this.pendingPartitions = new HashMap<>();
state.restore().forEach(p -> pendingPartitions.put(p,
currentTimeMillis));
@@ -93,7 +93,7 @@ public class PartitionMarkDoneTrigger {
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli();
- long partitionEndTime = partitionStartTime + timeInternal;
+ long partitionEndTime = partitionStartTime + timeInterval;
lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime);
if (currentTimeMillis - lastUpdateTime > idleTime) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
index b3c636308..fd66214e3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTriggerTest.java
@@ -52,11 +52,11 @@ class PartitionMarkDoneTriggerTest {
};
PartitionTimeExtractor extractor = new PartitionTimeExtractor("$dt",
"yyyy-MM-dd");
- Duration timeInternal = Duration.ofDays(1);
+ Duration timeInterval = Duration.ofDays(1);
Duration idleTime = Duration.ofMinutes(15);
PartitionMarkDoneTrigger trigger =
new PartitionMarkDoneTrigger(
- state, extractor, timeInternal, idleTime,
toEpochMillis("2024-02-01"));
+ state, extractor, timeInterval, idleTime,
toEpochMillis("2024-02-01"));
// test not reach partition end + idle time
trigger.notifyPartition("dt=2024-02-02", toEpochMillis("2024-02-01"));
@@ -91,7 +91,7 @@ class PartitionMarkDoneTriggerTest {
pendingPartitions.add("dt=2024-02-04");
trigger =
new PartitionMarkDoneTrigger(
- state, extractor, timeInternal, idleTime,
toEpochMillis("2024-02-06"));
+ state, extractor, timeInterval, idleTime,
toEpochMillis("2024-02-06"));
partitions = trigger.donePartitions(toEpochMillis("2024-02-06"));
assertThat(partitions).isEmpty();
partitions = trigger.donePartitions(toEpochMillis("2024-02-06") +
idleTime.toMillis() + 1);