This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2291e995feb [HUDI-7795] Fix loading of input splits from look up table 
reader (#11297)
2291e995feb is described below

commit 2291e995feb3027ac35305383518e4bd6677b9b1
Author: Danny Chan <[email protected]>
AuthorDate: Wed May 29 08:17:05 2024 +0800

    [HUDI-7795] Fix loading of input splits from look up table reader (#11297)
---
 .../apache/hudi/table/lookup/HoodieLookupFunction.java |  2 +-
 .../hudi/table/lookup/HoodieLookupTableReader.java     | 18 ++++++++++++++++--
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
index fe32a439f72..a43bf1189fb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
@@ -47,7 +47,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Lookup function for filesystem connector tables.
+ * Lookup function for Hoodie dimension table.
  *
  * <p>Note: reference Flink FileSystemLookupFunction to avoid additional 
connector jar dependencies.
  */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
index 0460cd42691..642a03ee65b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
@@ -29,6 +29,9 @@ import org.jetbrains.annotations.Nullable;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Hudi look up table reader.
@@ -41,6 +44,8 @@ public class HoodieLookupTableReader implements Serializable {
 
   private InputFormat inputFormat;
 
+  private List<InputSplit> inputSplits;
+
   public HoodieLookupTableReader(SerializableSupplier<InputFormat<RowData, ?>> 
inputFormatSupplier, Configuration conf) {
     this.inputFormatSupplier = inputFormatSupplier;
     this.conf = conf;
@@ -49,15 +54,24 @@ public class HoodieLookupTableReader implements 
Serializable {
   public void open() throws IOException {
     this.inputFormat = inputFormatSupplier.get();
     inputFormat.configure(conf);
-    InputSplit[] inputSplits = inputFormat.createInputSplits(1);
+    this.inputSplits = 
Arrays.stream(inputFormat.createInputSplits(1)).collect(Collectors.toList());
     ((RichInputFormat) inputFormat).openInputFormat();
-    inputFormat.open(inputSplits[0]);
+    inputFormat.open(inputSplits.remove(0));
   }
 
   @Nullable
   public RowData read(RowData reuse) throws IOException {
     if (!inputFormat.reachedEnd()) {
       return (RowData) inputFormat.nextRecord(reuse);
+    } else {
+      while (!inputSplits.isEmpty()) {
+        // release the last itr first.
+        inputFormat.close();
+        inputFormat.open(inputSplits.remove(0));
+        if (!inputFormat.reachedEnd()) {
+          return (RowData) inputFormat.nextRecord(reuse);
+        }
+      }
     }
     return null;
   }

Reply via email to