This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 376571af371 [HUDI-7787] Ignore loading data for look up function when 
the commit has not changed (#11287)
376571af371 is described below

commit 376571af3719834739599f698673f185ac409830
Author: hehuiyuan <[email protected]>
AuthorDate: Sat May 25 08:54:26 2024 +0800

    [HUDI-7787] Ignore loading data for look up function when the commit has 
not changed (#11287)
---
 .../org/apache/hudi/table/HoodieTableSource.java   |  3 ++-
 .../hudi/table/lookup/HoodieLookupFunction.java    | 31 +++++++++++++++++++++-
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 0e3749a493f..9726c1d564a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -310,7 +310,8 @@ public class HoodieTableSource implements
             new HoodieLookupTableReader(this::getBatchInputFormat, conf),
             tableRowType,
             getLookupKeys(context.getKeys()),
-            duration
+            duration,
+            conf
         ));
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
index 3fc65ce12b4..fe32a439f72 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/lookup/HoodieLookupFunction.java
@@ -18,9 +18,17 @@
 
 package org.apache.hudi.table.lookup;
 
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.util.StreamerUtil;
+
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.FunctionContext;
@@ -64,11 +72,16 @@ public class HoodieLookupFunction extends 
TableFunction<RowData> {
   // timestamp when cache expires
   private transient long nextLoadTime;
 
+  private transient HoodieTableMetaClient metaClient;
+  private transient HoodieInstant currentCommit;
+  private final Configuration conf;
+
   public HoodieLookupFunction(
       HoodieLookupTableReader partitionReader,
       RowType rowType,
       int[] lookupKeys,
-      Duration reloadInterval) {
+      Duration reloadInterval,
+      Configuration conf) {
     this.partitionReader = partitionReader;
     this.rowType = rowType;
     this.lookupFieldGetters = new RowData.FieldGetter[lookupKeys.length];
@@ -78,6 +91,7 @@ public class HoodieLookupFunction extends 
TableFunction<RowData> {
     }
     this.reloadInterval = reloadInterval;
     this.serializer = InternalSerializers.create(rowType);
+    this.conf = conf;
   }
 
   @Override
@@ -85,6 +99,8 @@ public class HoodieLookupFunction extends 
TableFunction<RowData> {
     super.open(context);
     cache = new HashMap<>();
     nextLoadTime = -1L;
+    org.apache.hadoop.conf.Configuration hadoopConf = 
HadoopConfigurations.getHadoopConf(conf);
+    metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
   }
 
   @Override
@@ -114,6 +130,19 @@ public class HoodieLookupFunction extends 
TableFunction<RowData> {
     } else {
       LOG.info("Populating lookup join cache");
     }
+
+    HoodieActiveTimeline latestCommit = metaClient.reloadActiveTimeline();
+    Option<HoodieInstant> latestCommitInstant = 
latestCommit.getCommitsTimeline().lastInstant();
+    if (latestCommit.empty()) {
+      LOG.info("No commit instant found currently.");
+      return;
+    }
+    // Determine whether to reload data by comparing instant
+    if (currentCommit != null && 
latestCommitInstant.get().equals(currentCommit)) {
+      LOG.info("Ignore loading data because the commit instant " + 
currentCommit + " has not changed.");
+      return;
+    }
+
     int numRetry = 0;
     while (true) {
       cache.clear();

Reply via email to