This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 376571af371 [HUDI-7787] Ignore loading data for look up function when
the commit has not changed (#11287)
376571af371 is described below
commit 376571af3719834739599f698673f185ac409830
Author: hehuiyuan <[email protected]>
AuthorDate: Sat May 25 08:54:26 2024 +0800
[HUDI-7787] Ignore loading data for look up function when the commit has
not changed (#11287)
---
.../org/apache/hudi/table/HoodieTableSource.java | 3 ++-
.../hudi/table/lookup/HoodieLookupFunction.java | 31 +++++++++++++++++++++-
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 0e3749a493f..9726c1d564a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -310,7 +310,8 @@ public class HoodieTableSource implements
new HoodieLookupTableReader(this::getBatchInputFormat, conf),
tableRowType,
getLookupKeys(context.getKeys()),
- duration
+ duration,
+ conf
));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
index 3fc65ce12b4..fe32a439f72 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
@@ -18,9 +18,17 @@
package org.apache.hudi.table.lookup;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.util.StreamerUtil;
+
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
@@ -64,11 +72,16 @@ public class HoodieLookupFunction extends
TableFunction<RowData> {
// timestamp when cache expires
private transient long nextLoadTime;
+ private transient HoodieTableMetaClient metaClient;
+ private transient HoodieInstant currentCommit;
+ private final Configuration conf;
+
public HoodieLookupFunction(
HoodieLookupTableReader partitionReader,
RowType rowType,
int[] lookupKeys,
- Duration reloadInterval) {
+ Duration reloadInterval,
+ Configuration conf) {
this.partitionReader = partitionReader;
this.rowType = rowType;
this.lookupFieldGetters = new RowData.FieldGetter[lookupKeys.length];
@@ -78,6 +91,7 @@ public class HoodieLookupFunction extends
TableFunction<RowData> {
}
this.reloadInterval = reloadInterval;
this.serializer = InternalSerializers.create(rowType);
+ this.conf = conf;
}
@Override
@@ -85,6 +99,8 @@ public class HoodieLookupFunction extends
TableFunction<RowData> {
super.open(context);
cache = new HashMap<>();
nextLoadTime = -1L;
+ org.apache.hadoop.conf.Configuration hadoopConf =
HadoopConfigurations.getHadoopConf(conf);
+ metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
}
@Override
@@ -114,6 +130,19 @@ public class HoodieLookupFunction extends
TableFunction<RowData> {
} else {
LOG.info("Populating lookup join cache");
}
+
+ HoodieActiveTimeline latestCommit = metaClient.reloadActiveTimeline();
+ Option<HoodieInstant> latestCommitInstant =
latestCommit.getCommitsTimeline().lastInstant();
+ if (latestCommit.empty()) {
+ LOG.info("No commit instant found currently.");
+ return;
+ }
+ // Determine whether to reload data by comparing instant
+ if (currentCommit != null &&
latestCommitInstant.get().equals(currentCommit)) {
+ LOG.info("Ignore loading data because the commit instant " +
currentCommit + " has not changed.");
+ return;
+ }
+
int numRetry = 0;
while (true) {
cache.clear();