This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8a4c01fe35 [Improve][Connector-v2] The hive connector support multiple
filesystem (#6648)
8a4c01fe35 is described below
commit 8a4c01fe359adbf96c5cdd39bdaebc0175722fa7
Author: dailai <[email protected]>
AuthorDate: Thu Apr 18 13:27:00 2024 +0800
[Improve][Connector-v2] The hive connector support multiple filesystem
(#6648)
---
docs/en/connector-v2/sink/Hive.md | 177 +++++++++++++++++++++
.../seatunnel/file/hdfs/sink/BaseHdfsFileSink.java | 8 +-
.../file/hdfs/source/BaseHdfsFileSource.java | 10 +-
.../seatunnel/file/config/HadoopConf.java | 36 ++++-
.../seatunnel/file/s3/config/S3Conf.java | 8 +-
seatunnel-connectors-v2/connector-hive/pom.xml | 15 ++
.../seatunnel/hive/config/HiveConfig.java | 16 ++
.../seatunnel/hive/config/HiveOnS3Conf.java | 57 +++++++
.../connectors/seatunnel/hive/sink/HiveSink.java | 21 ++-
.../seatunnel/hive/sink/HiveSinkFactory.java | 6 +-
.../seatunnel/hive/source/HiveSource.java | 20 ++-
.../seatunnel/hive/source/HiveSourceFactory.java | 2 +
.../seatunnel/hive/storage/AbstractStorage.java | 91 +++++++++++
.../seatunnel/hive/storage/COSStorage.java | 59 +++++++
.../seatunnel/hive/storage/HDFSStorage.java | 60 +++++++
.../seatunnel/hive/storage/OSSStorage.java | 43 +++++
.../seatunnel/hive/storage/S3Storage.java | 56 +++++++
.../connectors/seatunnel/hive/storage/Storage.java | 25 +++
.../seatunnel/hive/storage/StorageFactory.java | 32 ++++
.../seatunnel/hive/storage/StorageType.java | 25 +++
.../seatunnel/hive/storage/CosStorageTest.java | 77 +++++++++
.../seatunnel/hive/storage/HDFSStorageTest.java | 40 +++++
.../seatunnel/hive/storage/OSSStorageTest.java | 74 +++++++++
.../seatunnel/hive/storage/S3StorageTest.java | 106 ++++++++++++
.../seatunnel/hive/storage/StorageFactoryTest.java | 54 +++++++
.../src/test/resources/cos/core-site.xml | 47 ++++++
.../src/test/resources/oss/core-site.xml | 30 ++++
.../src/test/resources/s3/core-site.xml | 50 ++++++
.../connector-hive-e2e/pom.xml | 50 ++++++
.../seatunnel/e2e/connector/hive/HiveIT.java | 169 ++++++++++++++++++++
.../src/test/resources/fake_to_hive_on_cos.conf | 62 ++++++++
.../src/test/resources/fake_to_hive_on_hdfs.conf | 59 +++++++
.../src/test/resources/fake_to_hive_on_oss.conf | 62 ++++++++
.../src/test/resources/fake_to_hive_on_s3.conf | 62 ++++++++
.../src/test/resources/hive_on_cos_to_assert.conf | 76 +++++++++
.../src/test/resources/hive_on_hdfs_to_assert.conf | 73 +++++++++
.../src/test/resources/hive_on_oss_to_assert.conf | 76 +++++++++
.../src/test/resources/hive_on_s3_to_assert.conf | 76 +++++++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
.../container/seatunnel/SeaTunnelContainer.java | 6 +-
40 files changed, 1992 insertions(+), 25 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hive.md
b/docs/en/connector-v2/sink/Hive.md
index eec92b46b1..48a0ba41bd 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -37,6 +37,8 @@ By default, we use 2PC commit to ensure `exactly-once`
| compress_codec | string | no | none |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
+| hive.hadoop.conf | Map | no | - |
+| hive.hadoop.conf-path | string | no | - |
| krb5_path | string | no | /etc/krb5.conf |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
@@ -57,6 +59,16 @@ The path of `hdfs-site.xml`, used to load ha configuration
of namenodes
### hive_site_path [string]
+The path of `hive-site.xml`
+
+### hive.hadoop.conf [map]
+
+Properties in hadoop conf('core-site.xml', 'hdfs-site.xml', 'hive-site.xml')
+
+### hive.hadoop.conf-path [string]
+
+The specified loading path for the 'core-site.xml', 'hdfs-site.xml',
'hive-site.xml' files
+
### krb5_path [string]
The path of `krb5.conf`, used to authentication kerberos
@@ -162,6 +174,171 @@ sink {
Hive {
table_name = "test_hive.test_hive_sink_text_simple"
metastore_uri = "thrift://ctyun7:9083"
+ hive.hadoop.conf = {
+ bucket = "s3a://mybucket"
+ }
+}
+```
+
+## Hive on s3
+
+### Step 1
+
+Create the lib dir for hive of emr.
+
+```shell
+mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 2
+
+Get the jars from maven center to the lib.
+
+```shell
+cd ${SEATUNNEL_HOME}/plugins/Hive/lib
+wget
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
+wget
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
+```
+
+### Step 3
+
+Copy the jars from your environment on emr to the lib dir.
+
+```shell
+cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar
${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 4
+
+Run the case.
+
+```shell
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_s3"
+ metastore_uri =
"thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
+ hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
+ hive.hadoop.conf = {
+ bucket="s3://ws-package"
+ }
+ }
+}
+```
+
+## Hive on oss
+
+### Step 1
+
+Create the lib dir for hive of emr.
+
+```shell
+mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 2
+
+Get the jars from maven center to the lib.
+
+```shell
+cd ${SEATUNNEL_HOME}/plugins/Hive/lib
+wget
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
+```
+
+### Step 3
+
+Copy the jars from your environment on emr to the lib dir and delete the
conflicting jar.
+
+```shell
+cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar
${SEATUNNEL_HOME}/plugins/Hive/lib
+rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar
+```
+
+### Step 4
+
+Run the case.
+
+```shell
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_oss"
+ metastore_uri =
"thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ hive.hadoop.conf = {
+ bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
+ }
}
}
```
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
index 763691a463..dd4aaf943b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
@@ -29,6 +29,8 @@ import
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+import java.util.Objects;
+
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
public abstract class BaseHdfsFileSink extends BaseFileSink {
@@ -44,7 +46,11 @@ public abstract class BaseHdfsFileSink extends BaseFileSink {
getPluginName(), PluginType.SINK,
result.getMsg()));
}
super.prepare(pluginConfig);
- hadoopConf = new
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
+ // Avoid overwriting hadoopConf for subclass initialization. If a
subclass is initialized,
+ // it is not initialized here.
+ if (Objects.isNull(hadoopConf)) {
+ hadoopConf = new
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
+ }
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 75fbd04e68..78f3147b17 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
import java.io.IOException;
+import java.util.Objects;
public abstract class BaseHdfsFileSource extends BaseFileSource {
@@ -56,8 +57,13 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
getPluginName(), PluginType.SOURCE,
result.getMsg()));
}
String path =
pluginConfig.getString(HdfsSourceConfigOptions.FILE_PATH.key());
- hadoopConf =
- new
HadoopConf(pluginConfig.getString(HdfsSourceConfigOptions.DEFAULT_FS.key()));
+ // Avoid overwriting hadoopConf for subclass initialization. If a
subclass is initialized,
+ // it is not initialized here.
+ if (Objects.isNull(hadoopConf)) {
+ hadoopConf =
+ new HadoopConf(
+
pluginConfig.getString(HdfsSourceConfigOptions.DEFAULT_FS.key()));
+ }
if
(pluginConfig.hasPath(HdfsSourceConfigOptions.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(
pluginConfig.getString(HdfsSourceConfigOptions.HDFS_SITE_PATH.key()));
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index 11bbe4d3ab..4e6499f4de 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -60,22 +60,50 @@ public class HadoopConf implements Serializable {
public void setExtraOptionsForConfiguration(Configuration configuration) {
if (!extraOptions.isEmpty()) {
+ removeUnwantedOverwritingProps(extraOptions);
extraOptions.forEach(configuration::set);
}
if (hdfsSitePath != null) {
- configuration.addResource(new Path(hdfsSitePath));
+ Configuration hdfsSiteConfiguration = new Configuration();
+ hdfsSiteConfiguration.addResource(new Path(hdfsSitePath));
+ unsetUnwantedOverwritingProps(hdfsSiteConfiguration);
+ configuration.addResource(hdfsSiteConfiguration);
}
}
+ private void removeUnwantedOverwritingProps(Map extraOptions) {
+ extraOptions.remove(getFsDefaultNameKey());
+ extraOptions.remove(getHdfsImplKey());
+ extraOptions.remove(getHdfsImplDisableCacheKey());
+ }
+
+ public void unsetUnwantedOverwritingProps(Configuration
hdfsSiteConfiguration) {
+ hdfsSiteConfiguration.unset(getFsDefaultNameKey());
+ hdfsSiteConfiguration.unset(getHdfsImplKey());
+ hdfsSiteConfiguration.unset(getHdfsImplDisableCacheKey());
+ }
+
public Configuration toConfiguration() {
Configuration configuration = new Configuration();
configuration.setBoolean(READ_INT96_AS_FIXED, true);
configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
- configuration.setBoolean(String.format("fs.%s.impl.disable.cache",
getSchema()), true);
- configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
getHdfsNameKey());
- configuration.set(String.format("fs.%s.impl", getSchema()),
getFsHdfsImpl());
+ configuration.setBoolean(getHdfsImplDisableCacheKey(), true);
+ configuration.set(getFsDefaultNameKey(), getHdfsNameKey());
+ configuration.set(getHdfsImplKey(), getFsHdfsImpl());
return configuration;
}
+
+ public String getFsDefaultNameKey() {
+ return CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
+ }
+
+ public String getHdfsImplKey() {
+ return String.format("fs.%s.impl", getSchema());
+ }
+
+ public String getHdfsImplDisableCacheKey() {
+ return String.format("fs.%s.impl.disable.cache", getSchema());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
index cc35141866..2680ce151c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
@@ -29,8 +29,8 @@ import java.util.Map;
public class S3Conf extends HadoopConf {
private static final String HDFS_S3N_IMPL =
"org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String HDFS_S3A_IMPL =
"org.apache.hadoop.fs.s3a.S3AFileSystem";
- private static final String S3A_SCHEMA = "s3a";
- private static final String DEFAULT_SCHEMA = "s3n";
+ protected static final String S3A_SCHEMA = "s3a";
+ protected static final String DEFAULT_SCHEMA = "s3n";
private String schema = DEFAULT_SCHEMA;
@Override
@@ -47,7 +47,7 @@ public class S3Conf extends HadoopConf {
this.schema = schema;
}
- private S3Conf(String hdfsNameKey) {
+ protected S3Conf(String hdfsNameKey) {
super(hdfsNameKey);
}
@@ -80,7 +80,7 @@ public class S3Conf extends HadoopConf {
return buildWithConfig(config);
}
- private String switchHdfsImpl() {
+ protected String switchHdfsImpl() {
switch (this.schema) {
case S3A_SCHEMA:
return HDFS_S3A_IMPL;
diff --git a/seatunnel-connectors-v2/connector-hive/pom.xml
b/seatunnel-connectors-v2/connector-hive/pom.xml
index 7744a441bf..aef3c4451a 100644
--- a/seatunnel-connectors-v2/connector-hive/pom.xml
+++ b/seatunnel-connectors-v2/connector-hive/pom.xml
@@ -46,6 +46,21 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-s3</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-oss</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-cos</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
index 8cf00b8c30..0afadc64d8 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
@@ -26,6 +26,9 @@ import
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.metastore.api.Table;
+import java.util.HashMap;
+import java.util.Map;
+
public class HiveConfig {
public static final Option<String> TABLE_NAME =
Options.key("table_name")
@@ -51,6 +54,19 @@ public class HiveConfig {
.noDefaultValue()
.withDescription("The path of hive-site.xml");
+ public static final Option<Map<String, String>> HADOOP_CONF =
+ Options.key("hive.hadoop.conf")
+ .mapType()
+ .defaultValue(new HashMap<>())
+ .withDescription("Properties in hadoop conf");
+
+ public static final Option<String> HADOOP_CONF_PATH =
+ Options.key("hive.hadoop.conf-path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The specified loading path for the
'core-site.xml', 'hdfs-site.xml' files");
+
public static final String TEXT_INPUT_FORMAT_CLASSNAME =
"org.apache.hadoop.mapred.TextInputFormat";
public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOnS3Conf.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOnS3Conf.java
new file mode 100644
index 0000000000..01fede7517
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOnS3Conf.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
+import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
+
+public class HiveOnS3Conf extends S3Conf {
+ protected static final String S3_SCHEMA = "s3";
+ // The emr of amazon on s3 use this EmrFileSystem as the file system
+ protected static final String HDFS_S3_IMPL =
"com.amazon.ws.emr.hadoop.fs.EmrFileSystem";
+
+ protected HiveOnS3Conf(String hdfsNameKey, String schema) {
+ super(hdfsNameKey);
+ setSchema(schema);
+ }
+
+ @Override
+ public String getFsHdfsImpl() {
+ return switchHdfsImpl();
+ }
+
+ @Override
+ protected String switchHdfsImpl() {
+ return getSchema().equals(S3_SCHEMA) ? HDFS_S3_IMPL :
super.switchHdfsImpl();
+ }
+
+ public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig
readonlyConfig) {
+ S3Conf s3Conf = (S3Conf)
S3Conf.buildWithReadOnlyConfig(readonlyConfig);
+ String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);
+ if (bucketName.startsWith(DEFAULT_SCHEMA)) {
+ s3Conf.setSchema(DEFAULT_SCHEMA);
+ } else if (bucketName.startsWith(S3A_SCHEMA)) {
+ s3Conf.setSchema(S3A_SCHEMA);
+ } else {
+ s3Conf.setSchema(S3_SCHEMA);
+ }
+ return new HiveOnS3Conf(s3Conf.getHdfsNameKey(), s3Conf.getSchema());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 1c208b47da..078ce83fd1 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -29,7 +30,6 @@ import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
import
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedC
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -177,11 +178,19 @@ public class HiveSink extends BaseHdfsFileSink {
.withValue(SINK_COLUMNS.key(),
ConfigValueFactory.fromAnyRef(sinkFields))
.withValue(
PARTITION_BY.key(),
ConfigValueFactory.fromAnyRef(partitionKeys));
- String hdfsLocation = tableInformation.getSd().getLocation();
+ String hiveSdLocation = tableInformation.getSd().getLocation();
try {
- URI uri = new URI(hdfsLocation);
- String path = uri.getPath();
- hadoopConf = new HadoopConf(hdfsLocation.replace(path, ""));
+ /**
+ * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop
conf can be
+ * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy
can obtain the
+ * correct Schema and FsHdfsImpl that can be filled into hadoop
configuration in {@link
+ *
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
+ */
+ hadoopConf =
+ StorageFactory.getStorageType(hiveSdLocation)
+ .buildHadoopConfWithReadOnlyConfig(
+ ReadonlyConfig.fromConfig(pluginConfig));
+ String path = new URI(hiveSdLocation).getPath();
pluginConfig =
pluginConfig
.withValue(FILE_PATH.key(),
ConfigValueFactory.fromAnyRef(path))
@@ -193,7 +202,7 @@ public class HiveSink extends BaseHdfsFileSink {
String.format(
"Get hdfs namenode host from table location [%s]
failed,"
+ "please check it",
- hdfsLocation);
+ hiveSdLocation);
throw new HiveConnectorException(
HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED,
errorMsg, e);
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
index b98f6cffa5..e40864517f 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
@@ -24,8 +24,6 @@ import
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA;
-
@AutoService(Factory.class)
public class HiveSinkFactory implements TableSinkFactory {
@Override
@@ -38,7 +36,9 @@ public class HiveSinkFactory implements TableSinkFactory {
return OptionRule.builder()
.required(HiveConfig.TABLE_NAME)
.required(HiveConfig.METASTORE_URI)
- .optional(ABORT_DROP_PARTITION_METADATA)
+ .optional(HiveConfig.ABORT_DROP_PARTITION_METADATA)
+ .optional(HiveConfig.HADOOP_CONF)
+ .optional(HiveConfig.HADOOP_CONF_PATH)
.build();
}
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 071919ea95..f7642e3611 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -25,6 +25,7 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SqlType;
@@ -40,6 +41,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.BaseHdfsFileSo
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -153,11 +155,11 @@ public class HiveSource extends BaseHdfsFileSource {
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"Hive connector only support [text parquet orc] table
now");
}
- String hdfsLocation = tableInformation.getSd().getLocation();
+ String hiveSdLocation = tableInformation.getSd().getLocation();
try {
- URI uri = new URI(hdfsLocation);
+ URI uri = new URI(hiveSdLocation);
String path = uri.getPath();
- String defaultFs = hdfsLocation.replace(path, "");
+ String defaultFs = hiveSdLocation.replace(path, "");
pluginConfig =
pluginConfig
.withValue(
@@ -165,12 +167,22 @@ public class HiveSource extends BaseHdfsFileSource {
ConfigValueFactory.fromAnyRef(path))
.withValue(
FS_DEFAULT_NAME_KEY,
ConfigValueFactory.fromAnyRef(defaultFs));
+ /**
+ * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop
conf can be
+ * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy
can obtain the
+ * correct Schema and FsHdfsImpl that can be filled into hadoop
configuration in {@link
+ *
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
+ */
+ hadoopConf =
+ StorageFactory.getStorageType(hiveSdLocation)
+ .buildHadoopConfWithReadOnlyConfig(
+ ReadonlyConfig.fromConfig(pluginConfig));
} catch (URISyntaxException e) {
String errorMsg =
String.format(
"Get hdfs namenode host from table location [%s]
failed,"
+ "please check it",
- hdfsLocation);
+ hiveSdLocation);
throw new HiveConnectorException(
HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED,
errorMsg, e);
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
index 385738b2e8..afa1ae0e36 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
@@ -40,6 +40,8 @@ public class HiveSourceFactory implements TableSourceFactory {
.required(HiveConfig.METASTORE_URI)
.optional(BaseSourceConfigOptions.READ_PARTITIONS)
.optional(BaseSourceConfigOptions.READ_COLUMNS)
+ .optional(HiveConfig.HADOOP_CONF)
+ .optional(HiveConfig.HADOOP_CONF_PATH)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
new file mode 100644
index 0000000000..0e61226b1e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableList;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@Slf4j
+public abstract class AbstractStorage implements Storage {
+ private static final Option BUCKET_OPTION =
Options.key("bucket").stringType().noDefaultValue();
+ private static final List<String> HADOOP_CONF_FILES =
+ ImmutableList.of("core-site.xml", "hdfs-site.xml",
"hive-site.xml");
+
+ protected Config fillBucket(ReadonlyConfig readonlyConfig, Configuration
configuration) {
+ Config config = readonlyConfig.toConfig();
+ String bucketValue = configuration.get(BUCKET_OPTION.key());
+ if (StringUtils.isBlank(bucketValue)) {
+ throw new RuntimeException(
+ "There is no bucket property in conf which load from
[hadoop_conf_path,hadoop_conf].");
+ }
+ config = config.withValue(BUCKET_OPTION.key(),
ConfigValueFactory.fromAnyRef(bucketValue));
+ return config;
+ }
+
+ /**
+ * Loading Hadoop configuration by hadoop conf path or props set by
hive.hadoop.conf
+ *
+ * @return
+ */
+ protected Configuration loadHiveBaseHadoopConfig(ReadonlyConfig
readonlyConfig) {
+ Configuration configuration = new Configuration();
+ // Try to load from hadoop_conf_path(The Bucket configuration is
typically in core-site.xml)
+ Optional<String> hadoopConfPath =
readonlyConfig.getOptional(HiveConfig.HADOOP_CONF_PATH);
+ if (hadoopConfPath.isPresent()) {
+ HADOOP_CONF_FILES.forEach(
+ confFile -> {
+ java.nio.file.Path path =
Paths.get(hadoopConfPath.get(), confFile);
+ if (Files.exists(path)) {
+ try {
+
configuration.addResource(path.toUri().toURL());
+ } catch (IOException e) {
+ log.warn(
+ "Error adding Hadoop resource {},
resource was not added",
+ path,
+ e);
+ }
+ }
+ });
+ }
+ // Try to load from hadoopConf
+ Optional<Map<String, String>> hadoopConf =
+ readonlyConfig.getOptional(HiveConfig.HADOOP_CONF);
+ if (hadoopConf.isPresent()) {
+ hadoopConf.get().forEach((k, v) -> configuration.set(k, v));
+ }
+ return configuration;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/COSStorage.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/COSStorage.java
new file mode 100644
index 0000000000..0b2976424c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/COSStorage.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConfigOptions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public class COSStorage extends AbstractStorage {
+ @Override
+ public HadoopConf buildHadoopConfWithReadOnlyConfig(ReadonlyConfig
readonlyConfig) {
+ Configuration configuration = loadHiveBaseHadoopConfig(readonlyConfig);
+ Config config = fillBucket(readonlyConfig, configuration);
+ config =
+ config.withValue(
+ CosConfigOptions.SECRET_ID.key(),
+ ConfigValueFactory.fromAnyRef(
+
configuration.get(CosConfigOptions.SECRET_ID.key())));
+ config =
+ config.withValue(
+ CosConfigOptions.SECRET_KEY.key(),
+ ConfigValueFactory.fromAnyRef(
+
configuration.get(CosConfigOptions.SECRET_KEY.key())));
+ config =
+ config.withValue(
+ CosConfigOptions.REGION.key(),
+ ConfigValueFactory.fromAnyRef(
+
configuration.get(CosConfigOptions.REGION.key())));
+ HadoopConf hadoopConf = CosConf.buildWithConfig(config);
+ Map<String, String> propsInConfiguration =
+ configuration.getPropsWithPrefix(StringUtils.EMPTY);
+ hadoopConf.setExtraOptions(propsInConfiguration);
+ return hadoopConf;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorage.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorage.java
new file mode 100644
index 0000000000..d4415e96f6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class HDFSStorage extends AbstractStorage {
+
+ private String hiveSdLocation;
+
+ public HDFSStorage(String hiveSdLocation) {
+ this.hiveSdLocation = hiveSdLocation;
+ }
+
+ @Override
+ public HadoopConf buildHadoopConfWithReadOnlyConfig(ReadonlyConfig
readonlyConfig) {
+ try {
+ String path = new URI(hiveSdLocation).getPath();
+ HadoopConf hadoopConf = new
HadoopConf(hiveSdLocation.replace(path, StringUtils.EMPTY));
+ Configuration configuration =
loadHiveBaseHadoopConfig(readonlyConfig);
+ Map<String, String> propsInConfiguration =
+ configuration.getPropsWithPrefix(StringUtils.EMPTY);
+ hadoopConf.setExtraOptions(propsInConfiguration);
+ return hadoopConf;
+ } catch (URISyntaxException e) {
+ String errorMsg =
+ String.format(
+ "Get hdfs namenode host from table location [%s]
failed,"
+ + "please check it",
+ hiveSdLocation);
+ throw new HiveConnectorException(
+ HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED,
errorMsg, e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorage.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorage.java
new file mode 100644
index 0000000000..eb3ce67851
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssHadoopConf;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public class OSSStorage extends AbstractStorage {
+
+ @Override
+ public HadoopConf buildHadoopConfWithReadOnlyConfig(ReadonlyConfig
readonlyConfig) {
+ Configuration configuration = loadHiveBaseHadoopConfig(readonlyConfig);
+ Config config = fillBucket(readonlyConfig, configuration);
+ HadoopConf hadoopConf =
OssHadoopConf.buildWithConfig(ReadonlyConfig.fromConfig(config));
+ Map<String, String> propsInConfiguration =
+ configuration.getPropsWithPrefix(StringUtils.EMPTY);
+ hadoopConf.setExtraOptions(propsInConfiguration);
+ return hadoopConf;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3Storage.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3Storage.java
new file mode 100644
index 0000000000..877d14dfc4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3Storage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOnS3Conf;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public class S3Storage extends AbstractStorage {
+
+ @Override
+ public HadoopConf buildHadoopConfWithReadOnlyConfig(ReadonlyConfig
readonlyConfig) {
+ Configuration configuration = loadHiveBaseHadoopConfig(readonlyConfig);
+ Config config = fillBucket(readonlyConfig, configuration);
+ config =
+ config.withValue(
+ S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
+ ConfigValueFactory.fromAnyRef(
+ configuration.get(
+
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key())));
+ config =
+ config.withValue(
+ S3ConfigOptions.FS_S3A_ENDPOINT.key(),
+ ConfigValueFactory.fromAnyRef(
+
configuration.get(S3ConfigOptions.FS_S3A_ENDPOINT.key())));
+ HadoopConf hadoopConf =
+
HiveOnS3Conf.buildWithReadOnlyConfig(ReadonlyConfig.fromConfig(config));
+ Map<String, String> propsWithPrefix =
configuration.getPropsWithPrefix(StringUtils.EMPTY);
+ hadoopConf.setExtraOptions(propsWithPrefix);
+ return hadoopConf;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/Storage.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/Storage.java
new file mode 100644
index 0000000000..24df3e76d5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/Storage.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+public interface Storage {
+ HadoopConf buildHadoopConfWithReadOnlyConfig(ReadonlyConfig
readonlyConfig);
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactory.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactory.java
new file mode 100644
index 0000000000..8a08f2be97
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+public class StorageFactory {
+ public static Storage getStorageType(String hiveSdLocation) {
+ if (hiveSdLocation.startsWith(StorageType.S3.name().toLowerCase())) {
+ return new S3Storage();
+ } else if
(hiveSdLocation.startsWith(StorageType.OSS.name().toLowerCase())) {
+ return new OSSStorage();
+ } else if
(hiveSdLocation.startsWith(StorageType.COS.name().toLowerCase())) {
+ return new COSStorage();
+ } else {
+ return new HDFSStorage(hiveSdLocation);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageType.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageType.java
new file mode 100644
index 0000000000..49a7601977
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+public enum StorageType {
+ S3,
+ OSS,
+ COS,
+ HDFS
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/CosStorageTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/CosStorageTest.java
new file mode 100644
index 0000000000..a8f15b4b91
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/CosStorageTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConfigOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.HashMap;
+
+public class CosStorageTest {
+
+ private static final ReadonlyConfig COS =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ "hive.hadoop.conf",
+ new HashMap<String, String>() {
+ {
+ put("bucket", "cosn://my_bucket");
+
put(CosConfigOptions.SECRET_ID.key(), "test");
+
put(CosConfigOptions.SECRET_KEY.key(), "test");
+ put(CosConfigOptions.REGION.key(),
"ap-shanghai");
+ }
+ });
+ }
+ });
+
+ @Test
+ void fillBucketInHadoopConf() {
+ COSStorage cosStorage = new COSStorage();
+ HadoopConf cosnConf =
cosStorage.buildHadoopConfWithReadOnlyConfig(COS);
+ assertHadoopConf(cosnConf);
+ }
+
+ @Test
+ void fillBucketInHadoopConfPath() throws URISyntaxException {
+ URL resource = CosStorageTest.class.getResource("/cos");
+ String filePath = Paths.get(resource.toURI()).toString();
+ HashMap<String, Object> map = new HashMap<>();
+ map.put("hive.hadoop.conf-path", filePath);
+ map.putAll(COS.toMap());
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(map);
+ COSStorage cosStorage = new COSStorage();
+ HadoopConf hadoopConf =
cosStorage.buildHadoopConfWithReadOnlyConfig(readonlyConfig);
+ assertHadoopConf(hadoopConf);
+ }
+
+ private static void assertHadoopConf(HadoopConf cosnConf) {
+ Assertions.assertTrue(cosnConf instanceof CosConf);
+ Assertions.assertEquals(cosnConf.getSchema(), "cosn");
+ Assertions.assertEquals(cosnConf.getFsHdfsImpl(),
"org.apache.hadoop.fs.CosFileSystem");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorageTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorageTest.java
new file mode 100644
index 0000000000..da94a24537
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorageTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+public class HDFSStorageTest {
+
+ @Test
+ void fillbuildHadoopConfWithReadOnlyConfig() {
+ HDFSStorage hdfsStorage = new HDFSStorage("hdfs://tmp/test");
+ HadoopConf hadoopConf =
+ hdfsStorage.buildHadoopConfWithReadOnlyConfig(
+ ReadonlyConfig.fromMap(new HashMap<>(0)));
+ Assertions.assertEquals(hadoopConf.getSchema(), "hdfs");
+ Assertions.assertEquals(
+ hadoopConf.getFsHdfsImpl(),
"org.apache.hadoop.hdfs.DistributedFileSystem");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorageTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorageTest.java
new file mode 100644
index 0000000000..488fd33f98
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorageTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssHadoopConf;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.HashMap;
+
+public class OSSStorageTest {
+
+ private static final ReadonlyConfig OSS =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ "hive.hadoop.conf",
+ new HashMap<String, String>() {
+ {
+ put("bucket", "oss://my_bucket");
+ }
+ });
+ }
+ });
+
+ @Test
+ void fillBucketInHadoopConf() {
+ OSSStorage ossStorage = new OSSStorage();
+ HadoopConf ossnConf =
ossStorage.buildHadoopConfWithReadOnlyConfig(OSS);
+ assertHadoopConf(ossnConf);
+ }
+
+ @Test
+ void fillBucketInHadoopConfPath() throws URISyntaxException {
+ URL resource = OSSStorageTest.class.getResource("/oss");
+ String filePath = Paths.get(resource.toURI()).toString();
+ HashMap<String, Object> map = new HashMap<>();
+ map.put("hive.hadoop.conf-path", filePath);
+ map.putAll(OSS.toMap());
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(map);
+ OSSStorage ossStorage = new OSSStorage();
+ HadoopConf hadoopConf =
ossStorage.buildHadoopConfWithReadOnlyConfig(readonlyConfig);
+ assertHadoopConf(hadoopConf);
+ }
+
+ private void assertHadoopConf(HadoopConf ossnConf) {
+ Assertions.assertTrue(ossnConf instanceof OssHadoopConf);
+ Assertions.assertEquals(ossnConf.getSchema(), "oss");
+ Assertions.assertEquals(
+ ossnConf.getFsHdfsImpl(),
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3StorageTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3StorageTest.java
new file mode 100644
index 0000000000..52edf0fb4d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3StorageTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOnS3Conf;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.HashMap;
+
+public class S3StorageTest {
+
+ private static final ReadonlyConfig S3A =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ "hive.hadoop.conf",
+ new HashMap<String, String>() {
+ {
+
put(S3ConfigOptions.S3_BUCKET.key(), "s3a://my_bucket");
+ put(
+
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER
+ .key(),
+ "provider");
+ put(
+
S3ConfigOptions.FS_S3A_ENDPOINT.key(),
+
"http://s3.ap-northeast-1.amazonaws.com");
+ }
+ });
+ }
+ });
+
+ private static final ReadonlyConfig S3 =
+ ReadonlyConfig.fromMap(
+ new HashMap<String, Object>() {
+ {
+ put(
+ "hive.hadoop.conf",
+ new HashMap<String, String>() {
+ {
+
put(S3ConfigOptions.S3_BUCKET.key(), "s3://my_bucket");
+ put(
+
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER
+ .key(),
+ "testProvider");
+
put(S3ConfigOptions.FS_S3A_ENDPOINT.key(), "test");
+ }
+ });
+ }
+ });
+
+ @Test
+ void fillBucketInHadoopConf() {
+ S3Storage s3Storage = new S3Storage();
+ HadoopConf s3aConf = s3Storage.buildHadoopConfWithReadOnlyConfig(S3A);
+ assertHadoopConfForS3a(s3aConf);
+
+ HadoopConf s3Conf = s3Storage.buildHadoopConfWithReadOnlyConfig(S3);
+ Assertions.assertTrue(s3Conf instanceof HiveOnS3Conf);
+ Assertions.assertEquals(s3Conf.getSchema(), "s3");
+ Assertions.assertEquals(
+ s3Conf.getFsHdfsImpl(),
"com.amazon.ws.emr.hadoop.fs.EmrFileSystem");
+ }
+
+ @Test
+ void fillBucketInHadoopConfPath() throws URISyntaxException {
+ URL resource = S3StorageTest.class.getResource("/s3");
+ String filePath = Paths.get(resource.toURI()).toString();
+ HashMap<String, Object> map = new HashMap<>();
+ map.put("hive.hadoop.conf-path", filePath);
+ map.putAll(S3A.toMap());
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(map);
+ S3Storage s3Storage = new S3Storage();
+ HadoopConf hadoopConf =
s3Storage.buildHadoopConfWithReadOnlyConfig(readonlyConfig);
+ assertHadoopConfForS3a(hadoopConf);
+ }
+
+ private void assertHadoopConfForS3a(HadoopConf s3aConf) {
+ Assertions.assertTrue(s3aConf instanceof HiveOnS3Conf);
+ Assertions.assertEquals(s3aConf.getSchema(), "s3a");
+ Assertions.assertEquals(s3aConf.getFsHdfsImpl(),
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactoryTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactoryTest.java
new file mode 100644
index 0000000000..cd4d99bfaa
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactoryTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class StorageFactoryTest {
+
+ private static final Map<String, Class<? extends Storage>> STORAGE_MAP =
+ new HashMap() {
+ {
+ put("hdfs://path/to/", HDFSStorage.class);
+ put("s3n://path/to/", S3Storage.class);
+
put("s3://ws-package/hive/test_hive.db/test_hive_sink_on_s3", S3Storage.class);
+ put("s3a://path/to/", S3Storage.class);
+ put("oss://path/to/", OSSStorage.class);
+ put("cosn://path/to/", COSStorage.class);
+ }
+ };
+
+ @Test
+ void testStorageType() {
+ STORAGE_MAP
+ .entrySet()
+ .forEach(
+ storageMapEntry -> {
+ Class<? extends Storage> expectedStorageClass =
+ storageMapEntry.getValue();
+ Storage storage =
+
StorageFactory.getStorageType(storageMapEntry.getKey());
+ Assertions.assertNotNull(storage);
+
Assertions.assertTrue(expectedStorageClass.isInstance(storage));
+ });
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/resources/cos/core-site.xml
b/seatunnel-connectors-v2/connector-hive/src/test/resources/cos/core-site.xml
new file mode 100644
index 0000000000..e8cd95dfd3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/resources/cos/core-site.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>cosn://mybucket</value>
+ </property>
+ <property>
+ <name>fs.cosn.impl</name>
+ <value>org.apache.hadoop.fs.CosNFileSystem</value>
+ </property>
+ <property>
+ <name>fs.AbstractFileSystem.cosn.impl</name>
+ <value>org.apache.hadoop.fs.CosN</value>
+ </property>
+ <property>
+ <name>fs.cosn.credentials.provider</name>
+ <value>org.apache.hadoop.fs.auth.SimpleCredentialProvider</value>
+ </property>
+ <property>
+ <name>secret_id</name>
+ <value>your-cosn-secret_id</value>
+ </property>
+ <property>
+ <name>secret_key</name>
+ <value>your-secret_key</value>
+ </property>
+ <property>
+ <name>region</name>
+ <value>your-region</value>
+ </property>
+</configuration>
+
+
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/resources/oss/core-site.xml
b/seatunnel-connectors-v2/connector-hive/src/test/resources/oss/core-site.xml
new file mode 100644
index 0000000000..4ac8474938
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/resources/oss/core-site.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>oss://mybucket</value>
+ </property>
+ <property>
+ <name>fs.oss.accessKeyId</name>
+ <value>your-access-key-id</value>
+ </property>
+ <property>
+ <name>fs.oss.accessKeySecret</name>
+ <value>your-access-key-secret</value>
+ </property>
+</configuration>
+
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/resources/s3/core-site.xml
b/seatunnel-connectors-v2/connector-hive/src/test/resources/s3/core-site.xml
new file mode 100644
index 0000000000..34a28151c2
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hive/src/test/resources/s3/core-site.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>s3a://mybucket</value>
+ </property>
+ <property>
+ <name>fs.s3a.access.key</name>
+ <value>*******</value>
+ </property>
+ <property>
+ <name>fs.s3a.secret.key</name>
+ <value>*******</value>
+ </property>
+ <property>
+ <name>fs.s3a.connection.ssl.enabled</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>fs.s3a.path.style.access</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>fs.s3a.endpoint</name>
+ <value>http://s3.ap-northeast-1.amazonaws.com</value>
+ </property>
+ <property>
+ <name>fs.s3a.impl</name>
+ <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+ </property>
+ <property>
+ <name>hadoop.tmp.dir</name>
+ <value>/hadoop/tmp</value>
+ <description>A base for other temporary directories.</description>
+ </property>
+</configuration>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/pom.xml
new file mode 100644
index 0000000000..bd9a112b6a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-hive-e2e</artifactId>
+ <name>SeaTunnel : E2E : Connector V2 : Hive</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-hive</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
+ <version>${project.version}</version>
+ <classifier>optional</classifier>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
new file mode 100644
index 0000000000..fbe0b2503b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hive;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.utility.MountableFile;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.IOException;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK})
+@Disabled(
+ "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this
test, please set up your own environment in the test case file,
hadoop_hive_conf_path_local and ip below}")
+@Slf4j
+public class HiveIT extends TestSuiteBase implements TestResource {
+ private static final String HADOOP_HIVE_CONF_PATH_LOCAL =
+ "/Users/dailai/software/hadoop-3.3.3/etc/hadoop";
+ private static final String HADOOP_HIVE_CONF_PATH_IN_CONTAINER =
"/tmp/hadoop";
+
+ private String hiveExeUrl() {
+ return
"https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar";
+ }
+
+ private String hadoopAwsUrl() {
+ return
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar";
+ }
+
+ private String aliyunSdkOssUrl() {
+ return
"https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.4.1/aliyun-sdk-oss-3.4.1.jar";
+ }
+
+ private String jdomUrl() {
+ return "https://repo1.maven.org/maven2/org/jdom/jdom/1.1/jdom-1.1.jar";
+ }
+
+ private String hadoopAliyunUrl() {
+ return
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.1.4/hadoop-aliyun-3.1.4.jar";
+ }
+
+ private String hadoopCosUrl() {
+ return
"https://repo1.maven.org/maven2/com/qcloud/cos/hadoop-cos/2.6.5-8.0.2/hadoop-cos-2.6.5-8.0.2.jar";
+ }
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ container.execInContainer("sh", "-c", "chmod -R 777
/etc/hosts");
+ container.execInContainer("sh", "-c", "echo \"${IP01}
hadoop01\" >> /etc/hosts");
+ container.execInContainer("sh", "-c", "echo \"${IP02}
hadoop02\" >> /etc/hosts");
+ container.execInContainer("sh", "-c", "echo \"${IP03}
hadoop03\" >> /etc/hosts");
+ container.execInContainer("sh", "-c", "echo \"${IP04}
hadoop04\" >> /etc/hosts");
+ container.execInContainer("sh", "-c", "echo \"${IP05}
hadoop05\" >> /etc/hosts");
+ container.execInContainer("sh", "-c", "echo \"${IP06}
hadoop06\" >> /etc/hosts");
+ Assertions.assertTrue(
+ new File(HADOOP_HIVE_CONF_PATH_LOCAL).exists(),
+ HADOOP_HIVE_CONF_PATH_LOCAL + " must exist");
+ container.execInContainer(
+ "sh", "-c", "mkdir -p " +
HADOOP_HIVE_CONF_PATH_IN_CONTAINER);
+ container.execInContainer(
+ "sh", "-c", "chmod -R 777 " +
HADOOP_HIVE_CONF_PATH_IN_CONTAINER);
+ // Copy local hadoop conf and hive conf to the container
+ container.copyFileToContainer(
+ MountableFile.forHostPath(HADOOP_HIVE_CONF_PATH_LOCAL),
+ HADOOP_HIVE_CONF_PATH_IN_CONTAINER);
+
+ // The jar of hive-exec
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Hive/lib &&
cd /tmp/seatunnel/plugins/Hive/lib && wget "
+ + hiveExeUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ // The jar of s3
+ Container.ExecResult downloadS3Commands =
+ container.execInContainer(
+ "sh",
+ "-c",
+ "cd /tmp/seatunnel/plugins/Hive/lib && wget "
+ hadoopAwsUrl());
+ Assertions.assertEquals(
+ 0, downloadS3Commands.getExitCode(),
downloadS3Commands.getStderr());
+ // The jar of oss
+ Container.ExecResult downloadOssCommands =
+ container.execInContainer(
+ "sh",
+ "-c",
+ "cd /tmp/seatunnel/plugins/Hive/lib && wget "
+ + aliyunSdkOssUrl()
+ + " && wget "
+ + jdomUrl()
+ + " && wget "
+ + hadoopAliyunUrl());
+ Assertions.assertEquals(
+ 0, downloadOssCommands.getExitCode(),
downloadOssCommands.getStderr());
+ // The jar of cos
+ Container.ExecResult downloadCosCommands =
+ container.execInContainer(
+ "sh",
+ "-c",
+ "cd /tmp/seatunnel/plugins/Hive/lib && wget "
+ hadoopCosUrl());
+ Assertions.assertEquals(
+ 0, downloadCosCommands.getExitCode(),
downloadCosCommands.getStderr());
+ };
+
+ @Override
+ public void startUp() throws Exception {}
+
+ @Override
+ public void tearDown() throws Exception {}
+
+ private void executeJob(TestContainer container, String job1, String job2)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob(job1);
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Container.ExecResult readResult = container.executeJob(job2);
+ Assertions.assertEquals(0, readResult.getExitCode());
+ }
+
+ @TestTemplate
+ public void testFakeSinkHiveOnHDFS(TestContainer container) throws
Exception {
+ executeJob(container, "/fake_to_hive_on_hdfs.conf",
"/hive_on_hdfs_to_assert.conf");
+ }
+
+ @TestTemplate
+ public void testFakeSinkHiveOnS3(TestContainer container) throws Exception
{
+ executeJob(container, "/fake_to_hive_on_s3.conf",
"/hive_on_s3_to_assert.conf");
+ }
+
+ @TestTemplate
+ public void testFakeSinkHiveOnOSS(TestContainer container) throws
Exception {
+ executeJob(container, "/fake_to_hive_on_oss.conf",
"/hive_on_oss_to_assert.conf");
+ }
+
+ @TestTemplate
+ public void testFakeSinkHiveOnCos(TestContainer container) throws
Exception {
+ executeJob(container, "/fake_to_hive_on_cos.conf",
"/hive_on_cos_to_assert.conf");
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_cos.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_cos.conf
new file mode 100644
index 0000000000..4e7cc1468b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_cos.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_cos"
+ metastore_uri = "thrift://hadoop04:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ hive.hadoop.conf = {
+ bucket="cosn://emr-cosn.com"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs.conf
new file mode 100644
index 0000000000..db0e9cdc6c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_hdfs"
+ metastore_uri = "thrift://hadoop04:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_oss.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_oss.conf
new file mode 100644
index 0000000000..a3780e0c07
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_oss.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_oss"
+ metastore_uri =
"thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ hive.hadoop.conf = {
+ bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_s3.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_s3.conf
new file mode 100644
index 0000000000..963c680394
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_s3.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_s3"
+ metastore_uri =
"thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
+ hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
+ hive.hadoop.conf = {
+ bucket="s3://ws-package"
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_cos_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_cos_to_assert.conf
new file mode 100644
index 0000000000..bc43786d5a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_cos_to_assert.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_cos"
+ metastore_uri = "thrift://hadoop04:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ hive.hadoop.conf = {
+ bucket="cosn://emr-cosn.com"
+ }
+ result_table_name = hive_source
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = hive_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert.conf
new file mode 100644
index 0000000000..87df421a2d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_hdfs"
+ metastore_uri = "thrift://hadoop04:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ result_table_name = hive_source
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = hive_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_oss_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_oss_to_assert.conf
new file mode 100644
index 0000000000..4968a825c2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_oss_to_assert.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_oss"
+ metastore_uri =
"thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ hive.hadoop.conf = {
+ bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
+ }
+ result_table_name = hive_source
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = hive_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_s3_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_s3_to_assert.conf
new file mode 100644
index 0000000000..e6e5b424e7
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_s3_to_assert.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "test_hive.test_hive_sink_on_s3"
+ metastore_uri =
"thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
+ hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
+ hive.hadoop.conf = {
+ bucket="s3://ws-package"
+ }
+ result_table_name = hive_source
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = hive_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 459c49a4d0..45c78dbf70 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -67,6 +67,7 @@
<module>connector-easysearch-e2e</module>
<module>connector-cdc-postgres-e2e</module>
<module>connector-cdc-oracle-e2e</module>
+ <module>connector-hive-e2e</module>
</modules>
<dependencies>
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 1f59f302f8..37af5011ef 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -352,7 +352,11 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
|| s.contains(
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner")
|| s.startsWith("Log4j2-TF-")
- || aqsThread.matcher(s).matches();
+ || aqsThread.matcher(s).matches()
+ // The renewed background thread of the hdfs client
+ || s.startsWith("LeaseRenewer")
+ // The read of hdfs which has the thread that is all in
running status
+ || s.startsWith("org.apache.hadoop.hdfs.PeerCache");
}
private void classLoaderObjectCheck(Integer maxSize) throws IOException,
InterruptedException {