This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 619b7504ca10dffc517771c212adcb64a3c13f47 Author: XuQianJin-Stars <[email protected]> AuthorDate: Tue Aug 9 13:18:55 2022 +0800 Reduce the scope and duration of holding checkpoint lock in stream read --- .../hudi/source/StreamReadMonitoringFunction.java | 33 ++++++++++++---------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index 3318cecf10..fde5130237 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -168,9 +168,7 @@ public class StreamReadMonitoringFunction public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context) throws Exception { checkpointLock = context.getCheckpointLock(); while (isRunning) { - synchronized (checkpointLock) { - monitorDirAndForwardSplits(context); - } + monitorDirAndForwardSplits(context); TimeUnit.SECONDS.sleep(interval); } } @@ -195,6 +193,8 @@ public class StreamReadMonitoringFunction // table does not exist return; } + + long start = System.currentTimeMillis(); IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant); if (result.isEmpty()) { @@ -202,28 +202,31 @@ public class StreamReadMonitoringFunction return; } - for (MergeOnReadInputSplit split : result.getInputSplits()) { - context.collect(split); + LOG.debug( + "Discovered {} splits, time elapsed {}ms", + result.getInputSplits().size(), + System.currentTimeMillis() - start); + + // only need to hold the checkpoint lock when emitting the splits + start = System.currentTimeMillis(); + synchronized (checkpointLock) { + for (MergeOnReadInputSplit split : result.getInputSplits()) { + context.collect(split); + } } + // update the issues instant time this.issuedInstant = result.getEndInstant(); LOG.info("\n" + "------------------------------------------------------------\n" - + "---------- consumed to instant: {}\n" + + "---------- consumed to instant: {}, time elapsed {}ms\n" + "------------------------------------------------------------", - this.issuedInstant); + this.issuedInstant, System.currentTimeMillis() - start); } @Override public void close() throws Exception { - super.close(); - - if (checkpointLock != null) { - synchronized (checkpointLock) { - issuedInstant = null; - isRunning = false; - } - } + cancel(); if (LOG.isDebugEnabled()) { LOG.debug("Closed File Monitoring Source for path: " + path + ".");
