This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2291e995feb [HUDI-7795] Fix loading of input splits from look up table
reader (#11297)
2291e995feb is described below
commit 2291e995feb3027ac35305383518e4bd6677b9b1
Author: Danny Chan <[email protected]>
AuthorDate: Wed May 29 08:17:05 2024 +0800
[HUDI-7795] Fix loading of input splits from look up table reader (#11297)
---
.../apache/hudi/table/lookup/HoodieLookupFunction.java | 2 +-
.../hudi/table/lookup/HoodieLookupTableReader.java | 18 ++++++++++++++++--
2 files changed, 17 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
index fe32a439f72..a43bf1189fb 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
@@ -47,7 +47,7 @@ import java.util.List;
import java.util.Map;
/**
- * Lookup function for filesystem connector tables.
+ * Lookup function for Hoodie dimension table.
*
* <p>Note: reference Flink FileSystemLookupFunction to avoid additional
connector jar dependencies.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
index 0460cd42691..642a03ee65b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupTableReader.java
@@ -29,6 +29,9 @@ import org.jetbrains.annotations.Nullable;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
/**
* Hudi look up table reader.
@@ -41,6 +44,8 @@ public class HoodieLookupTableReader implements Serializable {
private InputFormat inputFormat;
+ private List<InputSplit> inputSplits;
+
public HoodieLookupTableReader(SerializableSupplier<InputFormat<RowData, ?>>
inputFormatSupplier, Configuration conf) {
this.inputFormatSupplier = inputFormatSupplier;
this.conf = conf;
@@ -49,15 +54,24 @@ public class HoodieLookupTableReader implements
Serializable {
public void open() throws IOException {
this.inputFormat = inputFormatSupplier.get();
inputFormat.configure(conf);
- InputSplit[] inputSplits = inputFormat.createInputSplits(1);
+ this.inputSplits =
Arrays.stream(inputFormat.createInputSplits(1)).collect(Collectors.toList());
((RichInputFormat) inputFormat).openInputFormat();
- inputFormat.open(inputSplits[0]);
+ inputFormat.open(inputSplits.remove(0));
}
@Nullable
public RowData read(RowData reuse) throws IOException {
if (!inputFormat.reachedEnd()) {
return (RowData) inputFormat.nextRecord(reuse);
+ } else {
+ while (!inputSplits.isEmpty()) {
+ // release the last itr first.
+ inputFormat.close();
+ inputFormat.open(inputSplits.remove(0));
+ if (!inputFormat.reachedEnd()) {
+ return (RowData) inputFormat.nextRecord(reuse);
+ }
+ }
}
return null;
}