This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e1fc3d1b0 [Hotfix][Connector-V2][Hive] Fix npe of getting file system 
(#3506)
e1fc3d1b0 is described below

commit e1fc3d1b01358f3f387ad5b7436f10aa6b4a1b80
Author: Tyrantlucifer <[email protected]>
AuthorDate: Thu Nov 24 21:03:16 2022 +0800

    [Hotfix][Connector-V2][Hive] Fix npe of getting file system (#3506)
    
    * [Hotfix][Connector-V2][Hive] Fix npe of getting file system
    
    * [Hotfix][Connector-V2][Hive] Remove redundant char
---
 .../connectors/seatunnel/file/sink/BaseFileSink.java         |  2 +-
 .../file/sink/commit/FileSinkAggregatedCommitter.java        |  8 ++++++++
 .../connectors/seatunnel/file/sink/util/FileSystemUtils.java | 11 +++++++++++
 .../seatunnel/hive/commit/HiveSinkAggregatedCommitter.java   | 12 +++++++-----
 .../seatunnel/connectors/seatunnel/hive/sink/HiveSink.java   |  2 +-
 5 files changed, 28 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 798de1d10..6bbb2598b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -77,7 +77,7 @@ public abstract class BaseFileSink implements 
SeaTunnelSink<SeaTunnelRow, FileSi
 
     @Override
     public Optional<SinkAggregatedCommitter<FileCommitInfo, 
FileAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
-        return Optional.of(new FileSinkAggregatedCommitter());
+        return Optional.of(new FileSinkAggregatedCommitter(hadoopConf));
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index a0f1516e9..3dbe562e1 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
 
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 
 import lombok.extern.slf4j.Slf4j;
@@ -30,6 +31,13 @@ import java.util.Map;
 
 @Slf4j
 public class FileSinkAggregatedCommitter implements 
SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo> {
+    protected final HadoopConf hadoopConf;
+
+    public FileSinkAggregatedCommitter(HadoopConf hadoopConf) {
+        this.hadoopConf = hadoopConf;
+        FileSystemUtils.CONF = FileSystemUtils.getConfiguration(hadoopConf);
+        log.info("Hadoop configuration initial done, [{}]", hadoopConf);
+    }
 
     @Override
     public List<FileAggregatedCommitInfo> 
commit(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException 
{
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index bb69cf6b3..dff9ba57e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -17,9 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink.util;
 
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +41,14 @@ public class FileSystemUtils {
 
     public static Configuration CONF;
 
+    public static Configuration getConfiguration(HadoopConf hadoopConf) {
+        Configuration configuration = new Configuration();
+        configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
hadoopConf.getHdfsNameKey());
+        configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), 
hadoopConf.getFsHdfsImpl());
+        hadoopConf.setExtraOptionsForConfiguration(configuration);
+        return configuration;
+    }
+
     public static FileSystem getFileSystem(@NonNull String path) throws 
IOException {
         FileSystem fileSystem = 
FileSystem.get(URI.create(path.replaceAll("\\\\", "/")), CONF);
         fileSystem.setWriteChecksum(false);
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 410d9ec8c..01f5a9d27 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.commit;
 
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
 import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
@@ -37,7 +38,8 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
     private final String dbName;
     private final String tableName;
 
-    public HiveSinkAggregatedCommitter(Config pluginConfig, String dbName, 
String tableName) {
+    public HiveSinkAggregatedCommitter(Config pluginConfig, String dbName, 
String tableName, HadoopConf hadoopConf) {
+        super(hadoopConf);
         this.pluginConfig = pluginConfig;
         this.dbName = dbName;
         this.tableName = tableName;
@@ -55,9 +57,9 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
                         .collect(Collectors.toList());
                 try {
                     hiveMetaStore.addPartitions(dbName, tableName, partitions);
-                    log.info("Add these partitions [{}]", partitions);
+                    log.info("Add these partitions {}", partitions);
                 } catch (TException e) {
-                    log.error("Failed to add these partitions [{}]", 
partitions);
+                    log.error("Failed to add these partitions {}", partitions);
                     errorCommitInfos.add(aggregatedCommitInfo);
                 }
             }
@@ -77,9 +79,9 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
                     .collect(Collectors.toList());
             try {
                 hiveMetaStore.dropPartitions(dbName, tableName, partitions);
-                log.info("Remove these partitions [{}]", partitions);
+                log.info("Remove these partitions {}", partitions);
             } catch (TException e) {
-                log.error("Failed to remove these partitions [{}]", 
partitions);
+                log.error("Failed to remove these partitions {}", partitions);
             }
         }
         hiveMetaStore.close();
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index ac94e56e9..0e5a60532 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -130,6 +130,6 @@ public class HiveSink extends BaseHdfsFileSink {
 
     @Override
     public Optional<SinkAggregatedCommitter<FileCommitInfo, 
FileAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
-        return Optional.of(new HiveSinkAggregatedCommitter(pluginConfig, 
dbName, tableName));
+        return Optional.of(new HiveSinkAggregatedCommitter(pluginConfig, 
dbName, tableName, hadoopConf));
     }
 }

Reply via email to