This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e1fc3d1b0 [Hotfix][Connector-V2][Hive] Fix npe of getting file system
(#3506)
e1fc3d1b0 is described below
commit e1fc3d1b01358f3f387ad5b7436f10aa6b4a1b80
Author: Tyrantlucifer <[email protected]>
AuthorDate: Thu Nov 24 21:03:16 2022 +0800
[Hotfix][Connector-V2][Hive] Fix npe of getting file system (#3506)
* [Hotfix][Connector-V2][Hive] Fix npe of getting file system
* [Hotfix][Connector-V2][Hive] Remove redundant char
---
.../connectors/seatunnel/file/sink/BaseFileSink.java | 2 +-
.../file/sink/commit/FileSinkAggregatedCommitter.java | 8 ++++++++
.../connectors/seatunnel/file/sink/util/FileSystemUtils.java | 11 +++++++++++
.../seatunnel/hive/commit/HiveSinkAggregatedCommitter.java | 12 +++++++-----
.../seatunnel/connectors/seatunnel/hive/sink/HiveSink.java | 2 +-
5 files changed, 28 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 798de1d10..6bbb2598b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -77,7 +77,7 @@ public abstract class BaseFileSink implements
SeaTunnelSink<SeaTunnelRow, FileSi
@Override
public Optional<SinkAggregatedCommitter<FileCommitInfo,
FileAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
- return Optional.of(new FileSinkAggregatedCommitter());
+ return Optional.of(new FileSinkAggregatedCommitter(hadoopConf));
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index a0f1516e9..3dbe562e1 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import lombok.extern.slf4j.Slf4j;
@@ -30,6 +31,13 @@ import java.util.Map;
@Slf4j
public class FileSinkAggregatedCommitter implements
SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo> {
+ protected final HadoopConf hadoopConf;
+
+ public FileSinkAggregatedCommitter(HadoopConf hadoopConf) {
+ this.hadoopConf = hadoopConf;
+ FileSystemUtils.CONF = FileSystemUtils.getConfiguration(hadoopConf);
+ log.info("Hadoop configuration initial done, [{}]", hadoopConf);
+ }
@Override
public List<FileAggregatedCommitInfo>
commit(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException
{
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index bb69cf6b3..dff9ba57e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -17,9 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.util;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -38,6 +41,14 @@ public class FileSystemUtils {
public static Configuration CONF;
+ public static Configuration getConfiguration(HadoopConf hadoopConf) {
+ Configuration configuration = new Configuration();
+ configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
hadoopConf.getHdfsNameKey());
+ configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()),
hadoopConf.getFsHdfsImpl());
+ hadoopConf.setExtraOptionsForConfiguration(configuration);
+ return configuration;
+ }
+
public static FileSystem getFileSystem(@NonNull String path) throws
IOException {
FileSystem fileSystem =
FileSystem.get(URI.create(path.replaceAll("\\\\", "/")), CONF);
fileSystem.setWriteChecksum(false);
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 410d9ec8c..01f5a9d27 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.hive.commit;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
@@ -37,7 +38,8 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
private final String dbName;
private final String tableName;
- public HiveSinkAggregatedCommitter(Config pluginConfig, String dbName,
String tableName) {
+ public HiveSinkAggregatedCommitter(Config pluginConfig, String dbName,
String tableName, HadoopConf hadoopConf) {
+ super(hadoopConf);
this.pluginConfig = pluginConfig;
this.dbName = dbName;
this.tableName = tableName;
@@ -55,9 +57,9 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
.collect(Collectors.toList());
try {
hiveMetaStore.addPartitions(dbName, tableName, partitions);
- log.info("Add these partitions [{}]", partitions);
+ log.info("Add these partitions {}", partitions);
} catch (TException e) {
- log.error("Failed to add these partitions [{}]",
partitions);
+ log.error("Failed to add these partitions {}", partitions);
errorCommitInfos.add(aggregatedCommitInfo);
}
}
@@ -77,9 +79,9 @@ public class HiveSinkAggregatedCommitter extends
FileSinkAggregatedCommitter {
.collect(Collectors.toList());
try {
hiveMetaStore.dropPartitions(dbName, tableName, partitions);
- log.info("Remove these partitions [{}]", partitions);
+ log.info("Remove these partitions {}", partitions);
} catch (TException e) {
- log.error("Failed to remove these partitions [{}]",
partitions);
+ log.error("Failed to remove these partitions {}", partitions);
}
}
hiveMetaStore.close();
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index ac94e56e9..0e5a60532 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -130,6 +130,6 @@ public class HiveSink extends BaseHdfsFileSink {
@Override
public Optional<SinkAggregatedCommitter<FileCommitInfo,
FileAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
- return Optional.of(new HiveSinkAggregatedCommitter(pluginConfig,
dbName, tableName));
+ return Optional.of(new HiveSinkAggregatedCommitter(pluginConfig,
dbName, tableName, hadoopConf));
}
}