This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2f53be36bf [HUDI-5107] Fix hadoop config in DirectWriteMarkers,
HoodieFlinkEngineContext and StreamerUtil are not consistent issue (#7094)
2f53be36bf is described below
commit 2f53be36bfbc95e1749d6ff4859ee71330470e48
Author: FocusComputing <[email protected]>
AuthorDate: Tue Nov 1 10:14:24 2022 +0800
[HUDI-5107] Fix hadoop config in DirectWriteMarkers,
HoodieFlinkEngineContext and StreamerUtil are not consistent issue (#7094)
Co-authored-by: xiaoxingstack <[email protected]>
---
.../main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java | 2 +-
.../java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java | 4 ++++
.../hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java | 2 +-
3 files changed, 6 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
index f1a7cde432..27136dd1d2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java
@@ -106,7 +106,7 @@ public class DirectWriteMarkers extends WriteMarkers {
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining marker
files for all created, merged paths");
dataFiles.addAll(context.flatMap(subDirectories, directory -> {
Path path = new Path(directory);
- FileSystem fileSystem = path.getFileSystem(serializedConf.get());
+ FileSystem fileSystem = FSUtils.getFs(path, serializedConf.get());
RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path,
true);
List<String> result = new ArrayList<>();
while (itr.hasNext()) {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 5e38c24d30..c9136da6bb 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -67,6 +67,10 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), new
DefaultTaskContextSupplier());
}
+ public HoodieFlinkEngineContext(org.apache.hadoop.conf.Configuration
hadoopConf) {
+ this(new SerializableConfiguration(hadoopConf), new
DefaultTaskContextSupplier());
+ }
+
public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) {
this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()),
taskContextSupplier);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 29c0e2c061..588b4ad771 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -440,7 +440,7 @@ public class StreamerUtil {
public static HoodieFlinkWriteClient createWriteClient(Configuration conf)
throws IOException {
HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false);
// build the write client to start the embedded timeline server
- final HoodieFlinkWriteClient writeClient = new
HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig);
+ final HoodieFlinkWriteClient writeClient = new
HoodieFlinkWriteClient<>(new
HoodieFlinkEngineContext(HadoopConfigurations.getHadoopConf(conf)),
writeConfig);
writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)));
// create the filesystem view storage properties for client
final FileSystemViewStorageConfig viewStorageConfig =
writeConfig.getViewStorageConfig();