This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch release-0.10.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6c79a68ad91bac8f3bb0e4a97915e6ede90cfe6a
Author: Danny Chan <[email protected]>
AuthorDate: Fri Dec 3 08:59:10 2021 +0800

    [HUDI-2914] Fix remote timeline server config for flink (#4191)
    
    (cherry picked from commit 934fe54cc57b508875383cd807735b1323fef754)
---
 .../sink/partitioner/BucketAssignFunction.java     |  6 ++---
 .../hudi/sink/partitioner/BucketAssigner.java      |  6 +----
 .../partitioner/profile/DeltaWriteProfile.java     | 12 ++++-----
 .../sink/partitioner/profile/WriteProfile.java     | 29 ++++++----------------
 .../java/org/apache/hudi/util/StreamerUtil.java    |  8 ++++--
 .../hudi/sink/partitioner/TestBucketAssigner.java  |  2 +-
 .../org/apache/hudi/utils/TestStreamerUtil.java    | 11 ++++++++
 7 files changed, 34 insertions(+), 40 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 33f1dd6..73fd668 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -121,16 +121,16 @@ public class BucketAssignFunction<K, I, O extends 
HoodieRecord<?>>
         getRuntimeContext().getIndexOfThisSubtask(),
         getRuntimeContext().getMaxNumberOfParallelSubtasks(),
         getRuntimeContext().getNumberOfParallelSubtasks(),
-        ignoreSmallFiles(writeConfig),
+        ignoreSmallFiles(),
         HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)),
         context,
         writeConfig);
     this.payloadCreation = PayloadCreation.instance(this.conf);
   }
 
-  private boolean ignoreSmallFiles(HoodieWriteConfig writeConfig) {
+  private boolean ignoreSmallFiles() {
     WriteOperationType operationType = 
WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION));
-    return WriteOperationType.isOverwrite(operationType) || 
writeConfig.allowDuplicateInserts();
+    return WriteOperationType.isOverwrite(operationType);
   }
 
   @Override
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
index f9d5b1c..e73890f 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java
@@ -22,7 +22,6 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.sink.partitioner.profile.WriteProfile;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
-import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.BucketInfo;
 import org.apache.hudi.table.action.commit.BucketType;
 import org.apache.hudi.table.action.commit.SmallFile;
@@ -188,6 +187,7 @@ public class BucketAssigner implements AutoCloseable {
       smallFileAssignMap.put(partitionPath, assign);
       return assign;
     }
+    smallFileAssignMap.put(partitionPath, null);
     return null;
   }
 
@@ -211,10 +211,6 @@ public class BucketAssigner implements AutoCloseable {
     this.writeProfile.reload(checkpointId);
   }
 
-  public HoodieTable<?, ?, ?, ?> getTable() {
-    return this.writeProfile.getTable();
-  }
-
   private boolean fileIdOfThisTask(String fileId) {
     // the file id can shuffle to this task
     return KeyGroupRangeAssignment.assignKeyToParallelOperator(fileId, 
maxParallelism, numTasks) == taskID;
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index 922c056..1f08c5a 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -25,7 +25,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.AbstractTableFileSystemView;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.action.commit.SmallFile;
 
@@ -55,13 +55,11 @@ public class DeltaWriteProfile extends WriteProfile {
     // Find out all eligible small file slices
     if (!commitTimeline.empty()) {
       HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      // initialize the filesystem view based on the commit metadata
-      initFileSystemView();
-      // find smallest file in partition and append to it
+      // find the smallest file in partition and append to it
       List<FileSlice> allSmallFileSlices = new ArrayList<>();
       // If we can index log files, we can add more inserts to log files for 
fileIds including those under
       // pending compaction.
-      List<FileSlice> allFileSlices = 
fsView.getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), true)
+      List<FileSlice> allFileSlices = 
getFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), true)
           .collect(Collectors.toList());
       for (FileSlice fileSlice : allFileSlices) {
         if (isSmallFile(fileSlice)) {
@@ -91,8 +89,8 @@ public class DeltaWriteProfile extends WriteProfile {
     return smallFileLocations;
   }
 
-  protected AbstractTableFileSystemView getFileSystemView() {
-    return (AbstractTableFileSystemView) this.table.getSliceView();
+  protected SyncableFileSystemView getFileSystemView() {
+    return (SyncableFileSystemView) this.table.getSliceView();
   }
 
   private long getTotalFileSize(FileSlice fileSlice) {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index 1171a54..98eb29c 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -23,9 +23,10 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.AbstractTableFileSystemView;
+import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.sink.partitioner.BucketAssigner;
 import org.apache.hudi.table.HoodieFlinkTable;
@@ -92,11 +93,6 @@ public class WriteProfile {
   private long reloadedCheckpointId;
 
   /**
-   * The file system view cache for one checkpoint interval.
-   */
-  protected AbstractTableFileSystemView fsView;
-
-  /**
    * Metadata cache to reduce IO of metadata files.
    */
   private final Map<String, HoodieCommitMetadata> metadataCache;
@@ -120,8 +116,8 @@ public class WriteProfile {
     return recordsPerBucket;
   }
 
-  public HoodieTable<?, ?, ?, ?> getTable() {
-    return table;
+  public HoodieTableMetaClient getMetaClient() {
+    return this.table.getMetaClient();
   }
 
   /**
@@ -183,9 +179,7 @@ public class WriteProfile {
 
     if (!commitTimeline.empty()) { // if we have some commits
       HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      // initialize the filesystem view based on the commit metadata
-      initFileSystemView();
-      List<HoodieBaseFile> allFiles = fsView
+      List<HoodieBaseFile> allFiles = getFileSystemView()
           .getLatestBaseFilesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp()).collect(Collectors.toList());
 
       for (HoodieBaseFile file : allFiles) {
@@ -203,15 +197,8 @@ public class WriteProfile {
     return smallFileLocations;
   }
 
-  @VisibleForTesting
-  public void initFileSystemView() {
-    if (fsView == null) {
-      fsView = getFileSystemView();
-    }
-  }
-
-  protected AbstractTableFileSystemView getFileSystemView() {
-    return (AbstractTableFileSystemView) this.table.getBaseFileOnlyView();
+  protected SyncableFileSystemView getFileSystemView() {
+    return (SyncableFileSystemView) HoodieFlinkTable.create(config, 
(HoodieFlinkEngineContext) table.getContext()).getBaseFileOnlyView();
   }
 
   /**
@@ -245,9 +232,7 @@ public class WriteProfile {
       return;
     }
     this.table.getMetaClient().reloadActiveTimeline();
-    this.table.getHoodieView().sync();
     recordProfile();
-    this.fsView = null;
     
cleanMetadataCache(this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants().getInstants());
     this.smallFilesMap.clear();
     this.reloadedCheckpointId = checkpointId;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java 
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 28a70b9..1e39c92 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -365,6 +365,7 @@ public class StreamerUtil {
    *
    * <p>This expects to be used by client, the driver should start an embedded 
timeline server.
    */
+  @SuppressWarnings("rawtypes")
   public static HoodieFlinkWriteClient createWriteClient(Configuration conf, 
RuntimeContext runtimeContext) {
     HoodieFlinkEngineContext context =
         new HoodieFlinkEngineContext(
@@ -382,17 +383,20 @@ public class StreamerUtil {
    *
    * <p>The task context supplier is a constant: the write token is always 
'0-1-0'.
    */
+  @SuppressWarnings("rawtypes")
   public static HoodieFlinkWriteClient createWriteClient(Configuration conf) 
throws IOException {
     HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
+    // build the write client to start the embedded timeline server
+    final HoodieFlinkWriteClient writeClient = new 
HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
     // create the filesystem view storage properties for client
-    FileSystemViewStorageConfig viewStorageConfig = 
writeConfig.getViewStorageConfig();
+    final FileSystemViewStorageConfig viewStorageConfig = 
writeConfig.getViewStorageConfig();
     // rebuild the view storage config with simplified options.
     FileSystemViewStorageConfig rebuilt = 
FileSystemViewStorageConfig.newBuilder()
         .withStorageType(viewStorageConfig.getStorageType())
         .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost())
         
.withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build();
     ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), 
rebuilt);
-    return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, 
writeConfig);
+    return writeClient;
   }
 
   /**
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
index 053c2a3..4f4b549 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
@@ -401,7 +401,7 @@ public class TestBucketAssigner {
   }
 
   private static String getLastCompleteInstant(WriteProfile profile) {
-    return 
StreamerUtil.getLastCompletedInstant(profile.getTable().getMetaClient());
+    return StreamerUtil.getLastCompletedInstant(profile.getMetaClient());
   }
 
   private void assertBucketEquals(
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java 
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
index e00fbfa..57297c5 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
@@ -19,9 +19,12 @@
 package org.apache.hudi.utils;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.util.StreamerUtil;
+import org.apache.hudi.util.ViewStorageProperties;
 
 import org.apache.flink.configuration.Configuration;
 import org.junit.jupiter.api.Test;
@@ -98,5 +101,13 @@ public class TestStreamerUtil {
     long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower);
     assertThat(diff, is(75L));
   }
+
+  @Test
+  void testDumpRemoteViewStorageConfig() throws IOException {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    StreamerUtil.createWriteClient(conf);
+    FileSystemViewStorageConfig storageConfig = 
ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH));
+    assertThat(storageConfig.getStorageType(), 
is(FileSystemViewStorageType.REMOTE_FIRST));
+  }
 }
 

Reply via email to