This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1798e7a3e1 [flink] Add log for partition mark done trigger (#6146)
1798e7a3e1 is described below

commit 1798e7a3e1c596f8e5bd6da13c88b817f2873898
Author: YeJunHao <41894543+leaves12...@users.noreply.github.com>
AuthorDate: Wed Aug 27 10:19:36 2025 +0800

    [flink] Add log for partition mark done trigger (#6146)
---
 .../sink/listener/PartitionMarkDoneTrigger.java    | 29 ++++++++++++++++++++--
 1 file changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
index 439c4db3fd..0462582784 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneTrigger.java
@@ -132,19 +132,26 @@ public class PartitionMarkDoneTrigger {
         if (timeInterval == null || idleTime == null) {
             return Collections.emptyList();
         }
+        LOG.debug(
+                "End input is true and markDoneWhenEndInput is enabled, mark 
all pending partitions done: {}",
+                String.join(",", pendingPartitions.keySet()));
 
         List<String> needDone = new ArrayList<>();
         Iterator<Map.Entry<String, Long>> iter = 
pendingPartitions.entrySet().iterator();
         while (iter.hasNext()) {
             Map.Entry<String, Long> entry = iter.next();
             String partition = entry.getKey();
-
             long lastUpdateTime = entry.getValue();
-            long partitionStartTime;
+            LOG.debug(
+                    "Partition {} is in progress, last update time: {}",
+                    partition,
+                    entry.getValue());
 
+            long partitionStartTime;
             Optional<LocalDateTime> partitionLocalDateTimeOpt = 
extractDateTime(partition);
             // skip illegal partition
             if (!partitionLocalDateTimeOpt.isPresent()) {
+                LOG.debug("Partition {} is illegal, skip it", partition);
                 iter.remove();
                 continue;
             }
@@ -167,12 +174,30 @@ public class PartitionMarkDoneTrigger {
             }
             long partitionEndTime = partitionStartTime + timeInterval;
             lastUpdateTime = Math.max(lastUpdateTime, partitionEndTime);
+            LOG.debug(
+                    "Partition {} start time: {}, end time: {}, last update 
time after compare: {}",
+                    partition,
+                    partitionStartTime,
+                    partitionEndTime,
+                    lastUpdateTime);
 
             if (currentTimeMillis - lastUpdateTime > idleTime) {
+                LOG.debug(
+                        "Partition {} is idle for {} greater than idleTime {}, 
mark it done",
+                        partition,
+                        currentTimeMillis - lastUpdateTime,
+                        idleTime);
                 needDone.add(partition);
                 iter.remove();
+            } else {
+                LOG.debug(
+                        "Partition {} is idle for {} less than idleTime {}, no 
not mark it done",
+                        partition,
+                        currentTimeMillis - lastUpdateTime,
+                        idleTime);
             }
         }
+        LOG.debug("Need done partitions: {}", String.join(",", needDone));
         return needDone;
     }
 

Reply via email to