This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ee6cd456d8f8df7422155a0fc96d9295e2e8a7f2
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Oct 10 09:53:20 2025 +0800

    fix: Fix output type extracting for key selector in flink stream read 
(#14065)
---
 .../src/main/java/org/apache/hudi/table/HoodieTableSource.java    | 8 +++++++-
 .../org/apache/hudi/source/TestStreamReadMonitoringFunction.java  | 2 +-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index a88e1fec5cfc..c6c1b3d2b871 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -80,6 +80,7 @@ import org.apache.avro.Schema;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -253,7 +254,12 @@ public class HoodieTableSource implements
     } else if (OptionsResolver.isAppendMode(conf)) {
       return source.partitionCustom(new 
StreamReadAppendPartitioner(conf.get(FlinkOptions.READ_TASKS)), new 
StreamReadAppendKeySelector());
     } else {
-      return source.keyBy(MergeOnReadInputSplit::getFileId);
+      return source.keyBy(new KeySelector<MergeOnReadInputSplit, String>() {
+        @Override
+        public String getKey(MergeOnReadInputSplit split) throws Exception {
+          return split.getFileId();
+        }
+      });
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index 78ba30207353..5df02bae6e06 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -466,7 +466,6 @@ public class TestStreamReadMonitoringFunction {
   public void testCheckpointRestoreWithLimit() throws Exception {
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
     conf.set(FlinkOptions.READ_SPLITS_LIMIT, 2);
-    conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 1);
     StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
     OperatorSubtaskState state;
     try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function)) {
@@ -483,6 +482,7 @@ public class TestStreamReadMonitoringFunction {
       assertTrue(sourceContext.splits.stream().allMatch(split -> 
split.getInstantRange().isPresent()),
           "All instants should have range limit");
     }
+    conf.set(FlinkOptions.READ_SPLITS_LIMIT, Integer.MAX_VALUE);
     TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
     StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf);
     try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness = 
createHarness(function2)) {

Reply via email to