This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ee6cd456d8f8df7422155a0fc96d9295e2e8a7f2 Author: Shuo Cheng <[email protected]> AuthorDate: Fri Oct 10 09:53:20 2025 +0800 fix: Fix output type extracting for key selector in flink stream read (#14065) --- .../src/main/java/org/apache/hudi/table/HoodieTableSource.java | 8 +++++++- .../org/apache/hudi/source/TestStreamReadMonitoringFunction.java | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index a88e1fec5cfc..c6c1b3d2b871 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -80,6 +80,7 @@ import org.apache.avro.Schema; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -253,7 +254,12 @@ public class HoodieTableSource implements } else if (OptionsResolver.isAppendMode(conf)) { return source.partitionCustom(new StreamReadAppendPartitioner(conf.get(FlinkOptions.READ_TASKS)), new StreamReadAppendKeySelector()); } else { - return source.keyBy(MergeOnReadInputSplit::getFileId); + return source.keyBy(new KeySelector<MergeOnReadInputSplit, String>() { + @Override + public String getKey(MergeOnReadInputSplit split) throws Exception { + return split.getFileId(); + } + }); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java index 78ba30207353..5df02bae6e06 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java @@ -466,7 +466,6 @@ public class TestStreamReadMonitoringFunction { public void testCheckpointRestoreWithLimit() throws Exception { TestData.writeData(TestData.DATA_SET_INSERT, conf); conf.set(FlinkOptions.READ_SPLITS_LIMIT, 2); - conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 1); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); OperatorSubtaskState state; try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function)) { @@ -483,6 +482,7 @@ public class TestStreamReadMonitoringFunction { assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), "All instants should have range limit"); } + conf.set(FlinkOptions.READ_SPLITS_LIMIT, Integer.MAX_VALUE); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf); try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = createHarness(function2)) {
