This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3a575911520 [HUDI-8433] Fix not update issuedOffset when stream read
empty commits (#12166)
3a575911520 is described below
commit 3a57591152065ddb317c5fe67bab8163730f1e73
Author: fhan <[email protected]>
AuthorDate: Fri Nov 8 08:39:02 2024 +0800
[HUDI-8433] Fix not update issuedOffset when stream read empty commits
(#12166)
---
.../hudi/source/StreamReadMonitoringFunction.java | 10 ++++-
.../source/TestStreamReadMonitoringFunction.java | 49 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 0e3b1f0ce58..6a847a4c1a1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -19,6 +19,7 @@
package org.apache.hudi.source;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -214,8 +215,10 @@ public class StreamReadMonitoringFunction
}
IncrementalInputSplits.Result result =
incrementalInputSplits.inputSplits(metaClient, this.issuedOffset,
this.cdcEnabled);
- if (result.isEmpty()) {
+
+ if (result.isEmpty() && StringUtils.isNullOrEmpty(result.getEndInstant()))
{
// no new instants, returns early
+ LOG.warn("Result is empty, do not update issuedInstant.");
return;
}
@@ -282,4 +285,9 @@ public class StreamReadMonitoringFunction
readMetrics = new FlinkStreamReadMetrics(metrics);
readMetrics.registerMetrics();
}
+
+ public String getIssuedOffset() {
+ return issuedOffset;
+ }
+
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 5602ddaa8e3..db28ce1326f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -19,9 +19,13 @@
package org.apache.hudi.source;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
@@ -42,6 +46,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
@@ -51,6 +56,8 @@ import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -143,6 +150,48 @@ public class TestStreamReadMonitoringFunction {
}
}
+ @Test
+ public void testConsumeForSpeedLimitWhenEmptyCommitExists() throws Exception
{
+ // Step1 : create 4 empty commit
+ Configuration conf = new Configuration(this.conf);
+ conf.set(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_COPY_ON_WRITE);
+ conf.setBoolean(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), true);
+
+ TestData.writeData(Collections.EMPTY_LIST, conf);
+ TestData.writeData(Collections.EMPTY_LIST, conf);
+ TestData.writeData(Collections.EMPTY_LIST, conf);
+ TestData.writeData(Collections.EMPTY_LIST, conf);
+
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.init(conf.get(FlinkOptions.PATH),
HoodieTableType.COPY_ON_WRITE);
+ HoodieTimeline commitsTimeline = metaClient.reloadActiveTimeline()
+ .filter(hoodieInstant ->
hoodieInstant.getAction().equals(HoodieTimeline.COMMIT_ACTION));
+ HoodieInstant firstInstant = commitsTimeline.firstInstant().get();
+
+ // Step2: trigger streaming read from first instant and set
READ_COMMITS_LIMIT 2
+ conf.set(FlinkOptions.READ_AS_STREAMING, true);
+ conf.set(FlinkOptions.READ_STREAMING_SKIP_CLUSTERING, true);
+ conf.set(FlinkOptions.READ_STREAMING_SKIP_COMPACT, true);
+ conf.set(FlinkOptions.READ_COMMITS_LIMIT, 2);
+ conf.set(FlinkOptions.READ_START_COMMIT,
String.valueOf((Long.valueOf(firstInstant.getTimestamp()) - 100)));
+ StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
+ try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function)) {
+ harness.setup();
+ harness.open();
+
+ CountDownLatch latch = new CountDownLatch(0);
+ CollectingSourceContext sourceContext = new
CollectingSourceContext(latch);
+ function.monitorDirAndForwardSplits(sourceContext);
+ assertEquals(0, sourceContext.splits.size(), "There should be no
inputSplits");
+
+ // Step3: assert current IssuedOffset couldn't be null.
+ // Base on "IncrementalInputSplits#inputSplits =>
.startCompletionTime(issuedOffset != null ? issuedOffset :
this.conf.getString(FlinkOptions.READ_START_COMMIT))"
+ // If IssuedOffset still was null, hudi would take
FlinkOptions.READ_START_COMMIT again, which means streaming read is blocked.
+ assertNotNull(function.getIssuedOffset());
+ // Stop the stream task.
+ function.close();
+ }
+ }
+
@Test
public void testConsumeFromSpecifiedCommit() throws Exception {
// write 2 commits first, use the second commit time as the specified
start instant,