This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a575911520 [HUDI-8433] Fix not update issuedOffset when stream read 
empty commits (#12166)
3a575911520 is described below

commit 3a57591152065ddb317c5fe67bab8163730f1e73
Author: fhan <[email protected]>
AuthorDate: Fri Nov 8 08:39:02 2024 +0800

    [HUDI-8433] Fix not update issuedOffset when stream read empty commits 
(#12166)
---
 .../hudi/source/StreamReadMonitoringFunction.java  | 10 ++++-
 .../source/TestStreamReadMonitoringFunction.java   | 49 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 0e3b1f0ce58..6a847a4c1a1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.source;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -214,8 +215,10 @@ public class StreamReadMonitoringFunction
     }
     IncrementalInputSplits.Result result =
         incrementalInputSplits.inputSplits(metaClient, this.issuedOffset, 
this.cdcEnabled);
-    if (result.isEmpty()) {
+
+    if (result.isEmpty() && StringUtils.isNullOrEmpty(result.getEndInstant())) 
{
       // no new instants, returns early
+      LOG.warn("Result is empty, do not update issuedInstant.");
       return;
     }
 
@@ -282,4 +285,9 @@ public class StreamReadMonitoringFunction
     readMetrics = new FlinkStreamReadMetrics(metrics);
     readMetrics.registerMetrics();
   }
+
+  public String getIssuedOffset() {
+    return issuedOffset;
+  }
+
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 5602ddaa8e3..db28ce1326f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -19,9 +19,13 @@
 package org.apache.hudi.source;
 
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.StreamerUtil;
@@ -42,6 +46,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
@@ -51,6 +56,8 @@ import java.util.stream.Collectors;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -143,6 +150,48 @@ public class TestStreamReadMonitoringFunction {
     }
   }
 
+  @Test
+  public void testConsumeForSpeedLimitWhenEmptyCommitExists() throws Exception 
{
+    // Step1 : create 4 empty commit
+    Configuration conf = new Configuration(this.conf);
+    conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+    conf.setBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), true);
+
+    TestData.writeData(Collections.EMPTY_LIST, conf);
+    TestData.writeData(Collections.EMPTY_LIST, conf);
+    TestData.writeData(Collections.EMPTY_LIST, conf);
+    TestData.writeData(Collections.EMPTY_LIST, conf);
+
+    HoodieTableMetaClient metaClient = 
HoodieTestUtils.init(conf.get(FlinkOptions.PATH), 
HoodieTableType.COPY_ON_WRITE);
+    HoodieTimeline commitsTimeline = metaClient.reloadActiveTimeline()
+        .filter(hoodieInstant -> 
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+    HoodieInstant firstInstant = commitsTimeline.firstInstant().get();
+
+    // Step2: trigger streaming read from first instant and set 
READ_COMMITS_LIMIT 2
+    conf.set(FlinkOptions.READ_AS_STREAMING, true);
+    conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
+    conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true);
+    conf.set(FlinkOptions.READ_COMMITS_LIMIT, 2);
+    conf.set(FlinkOptions.READ_START_COMMIT, 
String.valueOf((Long.valueOf(firstInstant.getTimestamp()) - 100)));
+    StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+    try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
+      harness.setup();
+      harness.open();
+
+      CountDownLatch latch = new CountDownLatch(0);
+      CollectingSourceContext sourceContext = new 
CollectingSourceContext(latch);
+      function.monitorDirAndForwardSplits(sourceContext);
+      assertEquals(0, sourceContext.splits.size(), "There should be no 
inputSplits");
+
+      // Step3: assert current IssuedOffset couldn't be null.
+      // Base on "IncrementalInputSplits#inputSplits => 
.startCompletionTime(issuedOffset != null ? issuedOffset : 
this.conf.getString(FlinkOptions.READ_START_COMMIT))"
+      // If IssuedOffset still was null, hudi would take 
FlinkOptions.READ_START_COMMIT again, which means streaming read is blocked.
+      assertNotNull(function.getIssuedOffset());
+      // Stop the stream task.
+      function.close();
+    }
+  }
+
   @Test
   public void testConsumeFromSpecifiedCommit() throws Exception {
     // write 2 commits first, use the second commit time as the specified 
start instant,

Reply via email to