This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 619b7504ca10dffc517771c212adcb64a3c13f47
Author: XuQianJin-Stars <[email protected]>
AuthorDate: Tue Aug 9 13:18:55 2022 +0800

    Reduce the scope and duration of holding checkpoint lock in stream read
---
 .../hudi/source/StreamReadMonitoringFunction.java  | 33 ++++++++++++----------
 1 file changed, 18 insertions(+), 15 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 3318cecf10..fde5130237 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -168,9 +168,7 @@ public class StreamReadMonitoringFunction
   public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context) 
throws Exception {
     checkpointLock = context.getCheckpointLock();
     while (isRunning) {
-      synchronized (checkpointLock) {
-        monitorDirAndForwardSplits(context);
-      }
+      monitorDirAndForwardSplits(context);
       TimeUnit.SECONDS.sleep(interval);
     }
   }
@@ -195,6 +193,8 @@ public class StreamReadMonitoringFunction
       // table does not exist
       return;
     }
+
+    long start = System.currentTimeMillis();
     IncrementalInputSplits.Result result =
         incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, 
this.issuedInstant);
     if (result.isEmpty()) {
@@ -202,28 +202,31 @@ public class StreamReadMonitoringFunction
       return;
     }
 
-    for (MergeOnReadInputSplit split : result.getInputSplits()) {
-      context.collect(split);
+    LOG.debug(
+        "Discovered {} splits, time elapsed {}ms",
+        result.getInputSplits().size(),
+        System.currentTimeMillis() - start);
+
+    // only need to hold the checkpoint lock when emitting the splits
+    start = System.currentTimeMillis();
+    synchronized (checkpointLock) {
+      for (MergeOnReadInputSplit split : result.getInputSplits()) {
+        context.collect(split);
+      }
     }
+
     // update the issues instant time
     this.issuedInstant = result.getEndInstant();
     LOG.info("\n"
             + "------------------------------------------------------------\n"
-            + "---------- consumed to instant: {}\n"
+            + "---------- consumed to instant: {}, time elapsed {}ms\n"
             + "------------------------------------------------------------",
-        this.issuedInstant);
+        this.issuedInstant, System.currentTimeMillis() - start);
   }
 
   @Override
   public void close() throws Exception {
-    super.close();
-
-    if (checkpointLock != null) {
-      synchronized (checkpointLock) {
-        issuedInstant = null;
-        isRunning = false;
-      }
-    }
+    cancel();
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Closed File Monitoring Source for path: " + path + ".");

Reply via email to