This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 06d3fad0194 [HUDI-8041] Support projection push down for lookup join
(#11722)
06d3fad0194 is described below
commit 06d3fad0194c968df7dcb9c852b457e985d3e7c5
Author: Bingeng Huang <[email protected]>
AuthorDate: Sat Aug 3 10:35:12 2024 +0800
[HUDI-8041] Support projection push down for lookup join (#11722)
Co-authored-by: hbg <[email protected]>
---
.../src/main/java/org/apache/hudi/table/HoodieTableSource.java | 2 +-
.../test/java/org/apache/hudi/table/ITTestHoodieDataSource.java | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index bd941a95122..d1c7d7292d8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -308,7 +308,7 @@ public class HoodieTableSource implements
return TableFunctionProvider.of(
new HoodieLookupFunction(
new HoodieLookupTableReader(this::getBatchInputFormat, conf),
- tableRowType,
+ (RowType) getProducedDataType().notNull().getLogicalType(),
getLookupKeys(context.getKeys()),
duration,
conf
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 7cd1f4b3e7b..e0147f6ed8a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -698,14 +698,14 @@ public class ITTestHoodieDataSource {
void testLookupJoin(HoodieTableType tableType) {
TableEnvironment tableEnv = streamTableEnv;
String hoodieTableDDL = sql("t1")
- .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
- .option(FlinkOptions.TABLE_NAME, tableType)
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/t1")
+ .option(FlinkOptions.TABLE_TYPE, tableType)
.end();
tableEnv.executeSql(hoodieTableDDL);
String hoodieTableDDL2 = sql("t2")
- .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
- .option(FlinkOptions.TABLE_NAME, tableType)
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath() + "/t2")
+ .option(FlinkOptions.TABLE_TYPE, tableType)
.end();
tableEnv.executeSql(hoodieTableDDL2);