This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d5c3e5  [HUDI-2415]  Add more info log for flink streaming reader 
(#3642)
9d5c3e5 is described below

commit 9d5c3e5cb92a4247bb1fc9a4a0e2eb3d2fbce1d6
Author: Danny Chan <[email protected]>
AuthorDate: Sun Sep 12 10:00:17 2021 +0800

    [HUDI-2415]  Add more info log for flink streaming reader (#3642)
---
 .../org/apache/hudi/source/StreamReadMonitoringFunction.java | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index ec56903..c5610d2 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -248,6 +248,13 @@ public class StreamReadMonitoringFunction
     List<HoodieCommitMetadata> activeMetadataList = instants.stream()
         .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, 
instant, commitTimeline)).collect(Collectors.toList());
     List<HoodieCommitMetadata> archivedMetadataList = 
getArchivedMetadata(instantRange, commitTimeline, tableName);
+    if (archivedMetadataList.size() > 0) {
+      LOG.warn(""
+          + 
"--------------------------------------------------------------------------------\n"
+          + "---------- caution: the reader has fall behind too much from the 
writer,\n"
+          + "---------- tweak 'read.tasks' option to add parallelism of read 
tasks.\n"
+          + 
"--------------------------------------------------------------------------------");
+    }
     List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
         ? mergeList(activeMetadataList, archivedMetadataList)
         : activeMetadataList;
@@ -288,6 +295,11 @@ public class StreamReadMonitoringFunction
     }
     // update the issues instant time
     this.issuedInstant = commitToIssue;
+    LOG.info(""
+        + "------------------------------------------------------------\n"
+        + "---------- consumed to instant: {}\n"
+        + "------------------------------------------------------------",
+        commitToIssue);
   }
 
   @Override

Reply via email to