HuangZhenQiu commented on code in PR #17872:
URL: https://github.com/apache/hudi/pull/17872#discussion_r2699624715


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java:
##########
@@ -67,15 +70,27 @@ public HoodieSourceSplitReader(
 
   @Override
   public RecordsWithSplitIds<HoodieRecordWithPosition<T>> fetch() throws 
IOException {
-    HoodieSourceSplit nextSplit = splits.poll();
-    if (nextSplit != null) {
-      currentSplit = nextSplit;
-      currentSplitId = nextSplit.splitId();
-      return readerFunction.read(currentSplit);
+    if (currentReader == null) {
+      HoodieSourceSplit nextSplit = splits.poll();
+      if (nextSplit != null) {
+        currentSplit = nextSplit;
+        currentSplitId = nextSplit.splitId();
+        currentReader = readerFunction.read(currentSplit);

Review Comment:
   RecordsWithSplitIds can only read from one split. You can see line 86, if 
currentReader is created for a split, it will be used to read from all of the 
data from particular split. 
   
   For failover recover, the split record offset will be only updated during 
record emit  
https://github.com/apache/hudi/blob/master/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieRecordEmitter.java#L34.
 Only hoodie source splits will be checkpointed. From recover from failure, the 
record offset in hoodie source split will be used to move iterator of 
HoodieFileGroupReader to the right record offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to