This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch release-0.10.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2a4e7186883eb97eca7a5072844f47c77fbf1ef4
Author: Danny Chan <[email protected]>
AuthorDate: Fri Dec 3 16:12:59 2021 +0800

    [HUDI-2924] Refresh the fs view on successful checkpoints for write profile 
(#4199)
    
    (cherry picked from commit 0699521f83eab843279ff7d78ac2b92fc80ad79b)
---
 .../src/main/java/org/apache/hudi/sink/CleanFunction.java   |  4 +++-
 .../apache/hudi/sink/partitioner/BucketAssignFunction.java  |  2 +-
 .../hudi/sink/partitioner/profile/DeltaWriteProfile.java    |  2 +-
 .../apache/hudi/sink/partitioner/profile/WriteProfile.java  |  9 ++++++++-
 .../src/main/java/org/apache/hudi/util/StreamerUtil.java    | 13 ++++++++++++-
 5 files changed, 25 insertions(+), 5 deletions(-)

diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
index 26ac9f3..13154b2 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java
@@ -60,7 +60,9 @@ public class CleanFunction<T> extends AbstractRichFunction
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
-      this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext());
+      // do not use the remote filesystem view because the async cleaning 
service
+      // local timeline is very probably to fall behind with the remote one.
+      this.writeClient = StreamerUtil.createWriteClient(conf, 
getRuntimeContext(), false);
       this.executor = NonThrownExecutor.builder(LOG).build();
     }
   }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 73fd668..14cad16 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -113,7 +113,7 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
   @Override
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
-    HoodieWriteConfig writeConfig = 
StreamerUtil.getHoodieClientConfig(this.conf, true);
+    HoodieWriteConfig writeConfig = 
StreamerUtil.getHoodieClientConfig(this.conf, false);
     HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(
         new SerializableConfiguration(StreamerUtil.getHadoopConf()),
         new FlinkTaskContextSupplier(getRuntimeContext()));
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index e1c890c..97b6b23 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -59,7 +59,7 @@ public class DeltaWriteProfile extends WriteProfile {
       List<FileSlice> allSmallFileSlices = new ArrayList<>();
       // If we can index log files, we can add more inserts to log files for 
fileIds including those under
       // pending compaction.
-      List<FileSlice> allFileSlices = 
getFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), true)
+      List<FileSlice> allFileSlices = 
fsView.getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), true)
           .collect(Collectors.toList());
       for (FileSlice fileSlice : allFileSlices) {
         if (isSmallFile(fileSlice)) {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index d13162f..f6840e5 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -94,6 +94,11 @@ public class WriteProfile {
   private long reloadedCheckpointId;
 
   /**
+   * The file system view cache for one checkpoint interval.
+   */
+  protected SyncableFileSystemView fsView;
+
+  /**
    * Metadata cache to reduce IO of metadata files.
    */
   private final Map<String, HoodieCommitMetadata> metadataCache;
@@ -111,6 +116,7 @@ public class WriteProfile {
     this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
     this.metaClient = StreamerUtil.createMetaClient(config.getBasePath(), 
context.getHadoopConf().get());
     this.metadataCache = new HashMap<>();
+    this.fsView = getFileSystemView();
     // profile the record statistics on construction
     recordProfile();
   }
@@ -190,7 +196,7 @@ public class WriteProfile {
 
     if (!commitTimeline.empty()) { // if we have some commits
       HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      List<HoodieBaseFile> allFiles = getFileSystemView()
+      List<HoodieBaseFile> allFiles = fsView
           .getLatestBaseFilesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp()).collect(Collectors.toList());
 
       for (HoodieBaseFile file : allFiles) {
@@ -243,6 +249,7 @@ public class WriteProfile {
       return;
     }
     this.metaClient.reloadActiveTimeline();
+    this.fsView = getFileSystemView();
     recordProfile();
     
cleanMetadataCache(this.metaClient.getCommitsTimeline().filterCompletedInstants().getInstants());
     this.smallFilesMap.clear();
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 1e39c92..c417d7d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -367,12 +367,23 @@ public class StreamerUtil {
    */
   @SuppressWarnings("rawtypes")
   public static HoodieFlinkWriteClient createWriteClient(Configuration conf, 
RuntimeContext runtimeContext) {
+    return createWriteClient(conf, runtimeContext, true);
+  }
+
+  /**
+   * Creates the Flink write client.
+   *
+   * <p>This expects to be used by client, set flag {@code 
loadFsViewStorageConfig} to use
+   * remote filesystem view storage config, or an in-memory filesystem view 
storage is used.
+   */
+  @SuppressWarnings("rawtypes")
+  public static HoodieFlinkWriteClient createWriteClient(Configuration conf, 
RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) {
     HoodieFlinkEngineContext context =
         new HoodieFlinkEngineContext(
             new SerializableConfiguration(getHadoopConf()),
             new FlinkTaskContextSupplier(runtimeContext));
 
-    HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true);
+    HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, 
loadFsViewStorageConfig);
     return new HoodieFlinkWriteClient<>(context, writeConfig);
   }
 

Reply via email to