This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f53be36bf [HUDI-5107] Fix hadoop config in DirectWriteMarkers, 
HoodieFlinkEngineContext and StreamerUtil are not consistent issue (#7094)
2f53be36bf is described below

commit 2f53be36bfbc95e1749d6ff4859ee71330470e48
Author: FocusComputing <[email protected]>
AuthorDate: Tue Nov 1 10:14:24 2022 +0800

    [HUDI-5107] Fix hadoop config in DirectWriteMarkers, 
HoodieFlinkEngineContext and StreamerUtil are not consistent issue (#7094)
    
    Co-authored-by: xiaoxingstack <[email protected]>
---
 .../main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java    | 2 +-
 .../java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java  | 4 ++++
 .../hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java   | 2 +-
 3 files changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
index f1a7cde432..27136dd1d2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
@@ -106,7 +106,7 @@ public class DirectWriteMarkers extends WriteMarkers {
       context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker 
files for all created, merged paths");
       dataFiles.addAll(context.flatMap(subDirectories, directory -> {
         Path path = new Path(directory);
-        FileSystem fileSystem = path.getFileSystem(serializedConf.get());
+        FileSystem fileSystem = FSUtils.getFs(path, serializedConf.get());
         RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, 
true);
         List<String> result = new ArrayList<>();
         while (itr.hasNext()) {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 5e38c24d30..c9136da6bb 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -67,6 +67,10 @@ public class HoodieFlinkEngineContext extends 
HoodieEngineContext {
     this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), new 
DefaultTaskContextSupplier());
   }
 
+  public HoodieFlinkEngineContext(org.apache.hadoop.conf.Configuration 
hadoopConf) {
+    this(new SerializableConfiguration(hadoopConf), new 
DefaultTaskContextSupplier());
+  }
+
   public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) {
     this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), 
taskContextSupplier);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 29c0e2c061..588b4ad771 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -440,7 +440,7 @@ public class StreamerUtil {
   public static HoodieFlinkWriteClient createWriteClient(Configuration conf) 
throws IOException {
     HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
     // build the write client to start the embedded timeline server
-    final HoodieFlinkWriteClient writeClient = new 
HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
+    final HoodieFlinkWriteClient writeClient = new 
HoodieFlinkWriteClient<>(new 
HoodieFlinkEngineContext(HadoopConfigurations.getHadoopConf(conf)), 
writeConfig);
     
writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
     // create the filesystem view storage properties for client
     final FileSystemViewStorageConfig viewStorageConfig = 
writeConfig.getViewStorageConfig();

Reply via email to