This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 1798e7a3e1 [flink] Add log for partition mark done trigger (#6146) 1798e7a3e1 is described below commit 1798e7a3e1c596f8e5bd6da13c88b817f2873898 Author: YeJunHao <41894543+leaves12...@users.noreply.github.com> AuthorDate: Wed Aug 27 10:19:36 2025 +0800 [flink] Add log for partition mark done trigger (#6146) --- .../sink/listener/PartitionMarkDoneTrigger.java | 29 ++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java index 439c4db3fd..0462582784 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java @@ -132,19 +132,26 @@ public class PartitionMarkDoneTrigger { if (timeInterval == null || idleTime == null) { return Collections.emptyList(); } + LOG.debug( + "End input is true and markDoneWhenEndInput is enabled, mark all pending partitions done: {}", + String.join(",", pendingPartitions.keySet())); List<String> needDone = new ArrayList<>(); Iterator<Map.Entry<String, Long>> iter = pendingPartitions.entrySet().iterator(); while (iter.hasNext()) { Map.Entry<String, Long> entry = iter.next(); String partition = entry.getKey(); - long lastUpdateTime = entry.getValue(); - long partitionStartTime; + LOG.debug( + "Partition {} is in progress, last update time: {}", + partition, + entry.getValue()); + long partitionStartTime; Optional<LocalDateTime> partitionLocalDateTimeOpt = extractDateTime(partition); // skip illegal partition if (!partitionLocalDateTimeOpt.isPresent()) { + LOG.debug("Partition {} is illegal, skip it", partition); iter.remove(); continue; } @@ -167,12 +174,30 @@ public class PartitionMarkDoneTrigger { } long partitionEndTime = partitionStartTime + timeInterval; lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime); + LOG.debug( + "Partition {} start time: {}, end time: {}, last update time after compare: {}", + partition, + partitionStartTime, + partitionEndTime, + lastUpdateTime); if (currentTimeMillis - lastUpdateTime > idleTime) { + LOG.debug( + "Partition {} is idle for {} greater than idleTime {}, mark it done", + partition, + currentTimeMillis - lastUpdateTime, + idleTime); needDone.add(partition); iter.remove(); + } else { + LOG.debug( + "Partition {} is idle for {} less than idleTime {}, no not mark it done", + partition, + currentTimeMillis - lastUpdateTime, + idleTime); } } + LOG.debug("Need done partitions: {}", String.join(",", needDone)); return needDone; }