This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8a4c01fe35 [Improve][Connector-v2] The hive connector support multiple 
filesystem (#6648)
8a4c01fe35 is described below

commit 8a4c01fe359adbf96c5cdd39bdaebc0175722fa7
Author: dailai <[email protected]>
AuthorDate: Thu Apr 18 13:27:00 2024 +0800

    [Improve][Connector-v2] The hive connector support multiple filesystem 
(#6648)
---
 docs/en/connector-v2/sink/Hive.md                  | 177 +++++++++++++++++++++
 .../seatunnel/file/hdfs/sink/BaseHdfsFileSink.java |   8 +-
 .../file/hdfs/source/BaseHdfsFileSource.java       |  10 +-
 .../seatunnel/file/config/HadoopConf.java          |  36 ++++-
 .../seatunnel/file/s3/config/S3Conf.java           |   8 +-
 seatunnel-connectors-v2/connector-hive/pom.xml     |  15 ++
 .../seatunnel/hive/config/HiveConfig.java          |  16 ++
 .../seatunnel/hive/config/HiveOnS3Conf.java        |  57 +++++++
 .../connectors/seatunnel/hive/sink/HiveSink.java   |  21 ++-
 .../seatunnel/hive/sink/HiveSinkFactory.java       |   6 +-
 .../seatunnel/hive/source/HiveSource.java          |  20 ++-
 .../seatunnel/hive/source/HiveSourceFactory.java   |   2 +
 .../seatunnel/hive/storage/AbstractStorage.java    |  91 +++++++++++
 .../seatunnel/hive/storage/COSStorage.java         |  59 +++++++
 .../seatunnel/hive/storage/HDFSStorage.java        |  60 +++++++
 .../seatunnel/hive/storage/OSSStorage.java         |  43 +++++
 .../seatunnel/hive/storage/S3Storage.java          |  56 +++++++
 .../connectors/seatunnel/hive/storage/Storage.java |  25 +++
 .../seatunnel/hive/storage/StorageFactory.java     |  32 ++++
 .../seatunnel/hive/storage/StorageType.java        |  25 +++
 .../seatunnel/hive/storage/CosStorageTest.java     |  77 +++++++++
 .../seatunnel/hive/storage/HDFSStorageTest.java    |  40 +++++
 .../seatunnel/hive/storage/OSSStorageTest.java     |  74 +++++++++
 .../seatunnel/hive/storage/S3StorageTest.java      | 106 ++++++++++++
 .../seatunnel/hive/storage/StorageFactoryTest.java |  54 +++++++
 .../src/test/resources/cos/core-site.xml           |  47 ++++++
 .../src/test/resources/oss/core-site.xml           |  30 ++++
 .../src/test/resources/s3/core-site.xml            |  50 ++++++
 .../connector-hive-e2e/pom.xml                     |  50 ++++++
 .../seatunnel/e2e/connector/hive/HiveIT.java       | 169 ++++++++++++++++++++
 .../src/test/resources/fake_to_hive_on_cos.conf    |  62 ++++++++
 .../src/test/resources/fake_to_hive_on_hdfs.conf   |  59 +++++++
 .../src/test/resources/fake_to_hive_on_oss.conf    |  62 ++++++++
 .../src/test/resources/fake_to_hive_on_s3.conf     |  62 ++++++++
 .../src/test/resources/hive_on_cos_to_assert.conf  |  76 +++++++++
 .../src/test/resources/hive_on_hdfs_to_assert.conf |  73 +++++++++
 .../src/test/resources/hive_on_oss_to_assert.conf  |  76 +++++++++
 .../src/test/resources/hive_on_s3_to_assert.conf   |  76 +++++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 .../container/seatunnel/SeaTunnelContainer.java    |   6 +-
 40 files changed, 1992 insertions(+), 25 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hive.md 
b/docs/en/connector-v2/sink/Hive.md
index eec92b46b1..48a0ba41bd 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -37,6 +37,8 @@ By default, we use 2PC commit to ensure `exactly-once`
 | compress_codec                | string  | no       | none           |
 | hdfs_site_path                | string  | no       | -              |
 | hive_site_path                | string  | no       | -              |
+| hive.hadoop.conf              | Map     | no       | -              |
+| hive.hadoop.conf-path         | string  | no       | -              |
 | krb5_path                     | string  | no       | /etc/krb5.conf |
 | kerberos_principal            | string  | no       | -              |
 | kerberos_keytab_path          | string  | no       | -              |
@@ -57,6 +59,16 @@ The path of `hdfs-site.xml`, used to load ha configuration 
of namenodes
 
 ### hive_site_path [string]
 
+The path of `hive-site.xml`
+
+### hive.hadoop.conf [map]
+
+Properties in hadoop conf('core-site.xml', 'hdfs-site.xml', 'hive-site.xml')
+
+### hive.hadoop.conf-path [string]
+
+The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 
'hive-site.xml' files
+
 ### krb5_path [string]
 
 The path of `krb5.conf`, used to authentication kerberos
@@ -162,6 +174,171 @@ sink {
   Hive {
     table_name = "test_hive.test_hive_sink_text_simple"
     metastore_uri = "thrift://ctyun7:9083"
+    hive.hadoop.conf = {
+      bucket = "s3a://mybucket"
+    }
+}
+```
+
+## Hive on s3
+
+### Step 1
+
+Create the lib dir for hive of emr.
+
+```shell
+mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 2
+
+Get the jars from maven center to the lib.
+
+```shell
+cd ${SEATUNNEL_HOME}/plugins/Hive/lib
+wget 
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
+wget 
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
+```
+
+### Step 3
+
+Copy the jars from your environment on emr to the lib dir.
+
+```shell
+cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar 
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar 
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar 
${SEATUNNEL_HOME}/plugins/Hive/lib
+cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar 
${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 4
+
+Run the case.
+
+```shell
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_s3"
+    metastore_uri = 
"thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
+    hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
+    hive.hadoop.conf = {
+       bucket="s3://ws-package"
+    }
+  }
+}
+```
+
+## Hive on oss
+
+### Step 1
+
+Create the lib dir for hive of emr.
+
+```shell
+mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
+```
+
+### Step 2
+
+Get the jars from maven center to the lib.
+
+```shell
+cd ${SEATUNNEL_HOME}/plugins/Hive/lib
+wget 
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
+```
+
+### Step 3
+
+Copy the jars from your environment on emr to the lib dir and delete the 
conflicting jar.
+
+```shell
+cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar 
${SEATUNNEL_HOME}/plugins/Hive/lib
+rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar
+```
+
+### Step 4
+
+Run the case.
+
+```shell
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_oss"
+    metastore_uri = 
"thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
+    hive.hadoop.conf-path = "/tmp/hadoop"
+    hive.hadoop.conf = {
+        bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
+    }
   }
 }
 ```
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
index 763691a463..dd4aaf943b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
@@ -29,6 +29,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 
+import java.util.Objects;
+
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 
 public abstract class BaseHdfsFileSink extends BaseFileSink {
@@ -44,7 +46,11 @@ public abstract class BaseHdfsFileSink extends BaseFileSink {
                             getPluginName(), PluginType.SINK, 
result.getMsg()));
         }
         super.prepare(pluginConfig);
-        hadoopConf = new 
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
+        // Avoid overwriting hadoopConf for subclass initialization. If a 
subclass is initialized,
+        // it is not initialized here.
+        if (Objects.isNull(hadoopConf)) {
+            hadoopConf = new 
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
+        }
         if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
             
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
         }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 75fbd04e68..78f3147b17 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
 
 import java.io.IOException;
+import java.util.Objects;
 
 public abstract class BaseHdfsFileSource extends BaseFileSource {
 
@@ -56,8 +57,13 @@ public abstract class BaseHdfsFileSource extends 
BaseFileSource {
                             getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
         String path = 
pluginConfig.getString(HdfsSourceConfigOptions.FILE_PATH.key());
-        hadoopConf =
-                new 
HadoopConf(pluginConfig.getString(HdfsSourceConfigOptions.DEFAULT_FS.key()));
+        // Avoid overwriting hadoopConf for subclass initialization. If a 
subclass is initialized,
+        // it is not initialized here.
+        if (Objects.isNull(hadoopConf)) {
+            hadoopConf =
+                    new HadoopConf(
+                            
pluginConfig.getString(HdfsSourceConfigOptions.DEFAULT_FS.key()));
+        }
         if 
(pluginConfig.hasPath(HdfsSourceConfigOptions.HDFS_SITE_PATH.key())) {
             hadoopConf.setHdfsSitePath(
                     
pluginConfig.getString(HdfsSourceConfigOptions.HDFS_SITE_PATH.key()));
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index 11bbe4d3ab..4e6499f4de 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -60,22 +60,50 @@ public class HadoopConf implements Serializable {
 
     public void setExtraOptionsForConfiguration(Configuration configuration) {
         if (!extraOptions.isEmpty()) {
+            removeUnwantedOverwritingProps(extraOptions);
             extraOptions.forEach(configuration::set);
         }
         if (hdfsSitePath != null) {
-            configuration.addResource(new Path(hdfsSitePath));
+            Configuration hdfsSiteConfiguration = new Configuration();
+            hdfsSiteConfiguration.addResource(new Path(hdfsSitePath));
+            unsetUnwantedOverwritingProps(hdfsSiteConfiguration);
+            configuration.addResource(hdfsSiteConfiguration);
         }
     }
 
+    private void removeUnwantedOverwritingProps(Map extraOptions) {
+        extraOptions.remove(getFsDefaultNameKey());
+        extraOptions.remove(getHdfsImplKey());
+        extraOptions.remove(getHdfsImplDisableCacheKey());
+    }
+
+    public void unsetUnwantedOverwritingProps(Configuration 
hdfsSiteConfiguration) {
+        hdfsSiteConfiguration.unset(getFsDefaultNameKey());
+        hdfsSiteConfiguration.unset(getHdfsImplKey());
+        hdfsSiteConfiguration.unset(getHdfsImplDisableCacheKey());
+    }
+
     public Configuration toConfiguration() {
         Configuration configuration = new Configuration();
         configuration.setBoolean(READ_INT96_AS_FIXED, true);
         configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
         configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
         configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
-        configuration.setBoolean(String.format("fs.%s.impl.disable.cache", 
getSchema()), true);
-        configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, 
getHdfsNameKey());
-        configuration.set(String.format("fs.%s.impl", getSchema()), 
getFsHdfsImpl());
+        configuration.setBoolean(getHdfsImplDisableCacheKey(), true);
+        configuration.set(getFsDefaultNameKey(), getHdfsNameKey());
+        configuration.set(getHdfsImplKey(), getFsHdfsImpl());
         return configuration;
     }
+
+    public String getFsDefaultNameKey() {
+        return CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
+    }
+
+    public String getHdfsImplKey() {
+        return String.format("fs.%s.impl", getSchema());
+    }
+
+    public String getHdfsImplDisableCacheKey() {
+        return String.format("fs.%s.impl.disable.cache", getSchema());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
index cc35141866..2680ce151c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
@@ -29,8 +29,8 @@ import java.util.Map;
 public class S3Conf extends HadoopConf {
     private static final String HDFS_S3N_IMPL = 
"org.apache.hadoop.fs.s3native.NativeS3FileSystem";
     private static final String HDFS_S3A_IMPL = 
"org.apache.hadoop.fs.s3a.S3AFileSystem";
-    private static final String S3A_SCHEMA = "s3a";
-    private static final String DEFAULT_SCHEMA = "s3n";
+    protected static final String S3A_SCHEMA = "s3a";
+    protected static final String DEFAULT_SCHEMA = "s3n";
     private String schema = DEFAULT_SCHEMA;
 
     @Override
@@ -47,7 +47,7 @@ public class S3Conf extends HadoopConf {
         this.schema = schema;
     }
 
-    private S3Conf(String hdfsNameKey) {
+    protected S3Conf(String hdfsNameKey) {
         super(hdfsNameKey);
     }
 
@@ -80,7 +80,7 @@ public class S3Conf extends HadoopConf {
         return buildWithConfig(config);
     }
 
-    private String switchHdfsImpl() {
+    protected String switchHdfsImpl() {
         switch (this.schema) {
             case S3A_SCHEMA:
                 return HDFS_S3A_IMPL;
diff --git a/seatunnel-connectors-v2/connector-hive/pom.xml 
b/seatunnel-connectors-v2/connector-hive/pom.xml
index 7744a441bf..aef3c4451a 100644
--- a/seatunnel-connectors-v2/connector-hive/pom.xml
+++ b/seatunnel-connectors-v2/connector-hive/pom.xml
@@ -46,6 +46,21 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-s3</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-oss</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-cos</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
index 8cf00b8c30..0afadc64d8 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
@@ -26,6 +26,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hive.metastore.api.Table;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class HiveConfig {
     public static final Option<String> TABLE_NAME =
             Options.key("table_name")
@@ -51,6 +54,19 @@ public class HiveConfig {
                     .noDefaultValue()
                     .withDescription("The path of hive-site.xml");
 
+    public static final Option<Map<String, String>> HADOOP_CONF =
+            Options.key("hive.hadoop.conf")
+                    .mapType()
+                    .defaultValue(new HashMap<>())
+                    .withDescription("Properties in hadoop conf");
+
+    public static final Option<String> HADOOP_CONF_PATH =
+            Options.key("hive.hadoop.conf-path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The specified loading path for the 
'core-site.xml', 'hdfs-site.xml' files");
+
     public static final String TEXT_INPUT_FORMAT_CLASSNAME =
             "org.apache.hadoop.mapred.TextInputFormat";
     public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOnS3Conf.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOnS3Conf.java
new file mode 100644
index 0000000000..01fede7517
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOnS3Conf.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
+
+public class HiveOnS3Conf extends S3Conf {
+    protected static final String S3_SCHEMA = "s3";
+    // The emr of amazon on s3 use this EmrFileSystem as the file system
+    protected static final String HDFS_S3_IMPL = 
"com.amazon.ws.emr.hadoop.fs.EmrFileSystem";
+
+    protected HiveOnS3Conf(String hdfsNameKey, String schema) {
+        super(hdfsNameKey);
+        setSchema(schema);
+    }
+
+    @Override
+    public String getFsHdfsImpl() {
+        return switchHdfsImpl();
+    }
+
+    @Override
+    protected String switchHdfsImpl() {
+        return getSchema().equals(S3_SCHEMA) ? HDFS_S3_IMPL : 
super.switchHdfsImpl();
+    }
+
+    public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig 
readonlyConfig) {
+        S3Conf s3Conf = (S3Conf) 
S3Conf.buildWithReadOnlyConfig(readonlyConfig);
+        String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);
+        if (bucketName.startsWith(DEFAULT_SCHEMA)) {
+            s3Conf.setSchema(DEFAULT_SCHEMA);
+        } else if (bucketName.startsWith(S3A_SCHEMA)) {
+            s3Conf.setSchema(S3A_SCHEMA);
+        } else {
+            s3Conf.setSchema(S3_SCHEMA);
+        }
+        return new HiveOnS3Conf(s3Conf.getHdfsNameKey(), s3Conf.getSchema());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 1c208b47da..078ce83fd1 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
@@ -29,7 +30,6 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedC
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -177,11 +178,19 @@ public class HiveSink extends BaseHdfsFileSink {
                         .withValue(SINK_COLUMNS.key(), 
ConfigValueFactory.fromAnyRef(sinkFields))
                         .withValue(
                                 PARTITION_BY.key(), 
ConfigValueFactory.fromAnyRef(partitionKeys));
-        String hdfsLocation = tableInformation.getSd().getLocation();
+        String hiveSdLocation = tableInformation.getSd().getLocation();
         try {
-            URI uri = new URI(hdfsLocation);
-            String path = uri.getPath();
-            hadoopConf = new HadoopConf(hdfsLocation.replace(path, ""));
+            /**
+             * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop 
conf can be
+             * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy 
can obtain the
+             * correct Schema and FsHdfsImpl that can be filled into hadoop 
configuration in {@link
+             * 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
+             */
+            hadoopConf =
+                    StorageFactory.getStorageType(hiveSdLocation)
+                            .buildHadoopConfWithReadOnlyConfig(
+                                    ReadonlyConfig.fromConfig(pluginConfig));
+            String path = new URI(hiveSdLocation).getPath();
             pluginConfig =
                     pluginConfig
                             .withValue(FILE_PATH.key(), 
ConfigValueFactory.fromAnyRef(path))
@@ -193,7 +202,7 @@ public class HiveSink extends BaseHdfsFileSink {
                     String.format(
                             "Get hdfs namenode host from table location [%s] 
failed,"
                                     + "please check it",
-                            hdfsLocation);
+                            hiveSdLocation);
             throw new HiveConnectorException(
                     HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED, 
errorMsg, e);
         }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
index b98f6cffa5..e40864517f 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
@@ -24,8 +24,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA;
-
 @AutoService(Factory.class)
 public class HiveSinkFactory implements TableSinkFactory {
     @Override
@@ -38,7 +36,9 @@ public class HiveSinkFactory implements TableSinkFactory {
         return OptionRule.builder()
                 .required(HiveConfig.TABLE_NAME)
                 .required(HiveConfig.METASTORE_URI)
-                .optional(ABORT_DROP_PARTITION_METADATA)
+                .optional(HiveConfig.ABORT_DROP_PARTITION_METADATA)
+                .optional(HiveConfig.HADOOP_CONF)
+                .optional(HiveConfig.HADOOP_CONF_PATH)
                 .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 071919ea95..f7642e3611 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -25,6 +25,7 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 import org.apache.seatunnel.api.table.type.SqlType;
@@ -40,6 +41,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.BaseHdfsFileSo
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -153,11 +155,11 @@ public class HiveSource extends BaseHdfsFileSource {
                     CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
                     "Hive connector only support [text parquet orc] table 
now");
         }
-        String hdfsLocation = tableInformation.getSd().getLocation();
+        String hiveSdLocation = tableInformation.getSd().getLocation();
         try {
-            URI uri = new URI(hdfsLocation);
+            URI uri = new URI(hiveSdLocation);
             String path = uri.getPath();
-            String defaultFs = hdfsLocation.replace(path, "");
+            String defaultFs = hiveSdLocation.replace(path, "");
             pluginConfig =
                     pluginConfig
                             .withValue(
@@ -165,12 +167,22 @@ public class HiveSource extends BaseHdfsFileSource {
                                     ConfigValueFactory.fromAnyRef(path))
                             .withValue(
                                     FS_DEFAULT_NAME_KEY, 
ConfigValueFactory.fromAnyRef(defaultFs));
+            /**
+             * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop 
conf can be
+             * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy 
can obtain the
+             * correct Schema and FsHdfsImpl that can be filled into hadoop 
configuration in {@link
+             * 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
+             */
+            hadoopConf =
+                    StorageFactory.getStorageType(hiveSdLocation)
+                            .buildHadoopConfWithReadOnlyConfig(
+                                    ReadonlyConfig.fromConfig(pluginConfig));
         } catch (URISyntaxException e) {
             String errorMsg =
                     String.format(
                             "Get hdfs namenode host from table location [%s] 
failed,"
                                     + "please check it",
-                            hdfsLocation);
+                            hiveSdLocation);
             throw new HiveConnectorException(
                     HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED, 
errorMsg, e);
         }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
index 385738b2e8..afa1ae0e36 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
@@ -40,6 +40,8 @@ public class HiveSourceFactory implements TableSourceFactory {
                 .required(HiveConfig.METASTORE_URI)
                 .optional(BaseSourceConfigOptions.READ_PARTITIONS)
                 .optional(BaseSourceConfigOptions.READ_COLUMNS)
+                .optional(HiveConfig.HADOOP_CONF)
+                .optional(HiveConfig.HADOOP_CONF_PATH)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
new file mode 100644
index 0000000000..0e61226b1e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/AbstractStorage.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.ImmutableList;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+@Slf4j
+public abstract class AbstractStorage implements Storage {
+    private static final Option BUCKET_OPTION = 
Options.key("bucket").stringType().noDefaultValue();
+    private static final List<String> HADOOP_CONF_FILES =
+            ImmutableList.of("core-site.xml", "hdfs-site.xml", 
"hive-site.xml");
+
+    protected Config fillBucket(ReadonlyConfig readonlyConfig, Configuration 
configuration) {
+        Config config = readonlyConfig.toConfig();
+        String bucketValue = configuration.get(BUCKET_OPTION.key());
+        if (StringUtils.isBlank(bucketValue)) {
+            throw new RuntimeException(
+                    "There is no bucket property in conf which load from 
[hadoop_conf_path,hadoop_conf].");
+        }
+        config = config.withValue(BUCKET_OPTION.key(), 
ConfigValueFactory.fromAnyRef(bucketValue));
+        return config;
+    }
+
+    /**
+     * Loading Hadoop configuration by hadoop conf path or props set by 
hive.hadoop.conf
+     *
+     * @return
+     */
+    protected Configuration loadHiveBaseHadoopConfig(ReadonlyConfig 
readonlyConfig) {
+        Configuration configuration = new Configuration();
+        // Try to load from hadoop_conf_path(The Bucket configuration is 
typically in core-site.xml)
+        Optional<String> hadoopConfPath = 
readonlyConfig.getOptional(HiveConfig.HADOOP_CONF_PATH);
+        if (hadoopConfPath.isPresent()) {
+            HADOOP_CONF_FILES.forEach(
+                    confFile -> {
+                        java.nio.file.Path path = 
Paths.get(hadoopConfPath.get(), confFile);
+                        if (Files.exists(path)) {
+                            try {
+                                
configuration.addResource(path.toUri().toURL());
+                            } catch (IOException e) {
+                                log.warn(
+                                        "Error adding Hadoop resource {}, 
resource was not added",
+                                        path,
+                                        e);
+                            }
+                        }
+                    });
+        }
+        // Try to load from hadoopConf
+        Optional<Map<String, String>> hadoopConf =
+                readonlyConfig.getOptional(HiveConfig.HADOOP_CONF);
+        if (hadoopConf.isPresent()) {
+            hadoopConf.get().forEach((k, v) -> configuration.set(k, v));
+        }
+        return configuration;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/COSStorage.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/COSStorage.java
new file mode 100644
index 0000000000..0b2976424c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/COSStorage.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConfigOptions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public class COSStorage extends AbstractStorage {
+    @Override
+    public HadoopConf buildHadoopConfWithReadOnlyConfig(ReadonlyConfig 
readonlyConfig) {
+        Configuration configuration = loadHiveBaseHadoopConfig(readonlyConfig);
+        Config config = fillBucket(readonlyConfig, configuration);
+        config =
+                config.withValue(
+                        CosConfigOptions.SECRET_ID.key(),
+                        ConfigValueFactory.fromAnyRef(
+                                
configuration.get(CosConfigOptions.SECRET_ID.key())));
+        config =
+                config.withValue(
+                        CosConfigOptions.SECRET_KEY.key(),
+                        ConfigValueFactory.fromAnyRef(
+                                
configuration.get(CosConfigOptions.SECRET_KEY.key())));
+        config =
+                config.withValue(
+                        CosConfigOptions.REGION.key(),
+                        ConfigValueFactory.fromAnyRef(
+                                
configuration.get(CosConfigOptions.REGION.key())));
+        HadoopConf hadoopConf = CosConf.buildWithConfig(config);
+        Map<String, String> propsInConfiguration =
+                configuration.getPropsWithPrefix(StringUtils.EMPTY);
+        hadoopConf.setExtraOptions(propsInConfiguration);
+        return hadoopConf;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorage.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorage.java
new file mode 100644
index 0000000000..d4415e96f6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+public class HDFSStorage extends AbstractStorage {
+
+    private String hiveSdLocation;
+
+    public HDFSStorage(String hiveSdLocation) {
+        this.hiveSdLocation = hiveSdLocation;
+    }
+
+    @Override
+    public HadoopConf buildHadoopConfWithReadOnlyConfig(ReadonlyConfig 
readonlyConfig) {
+        try {
+            String path = new URI(hiveSdLocation).getPath();
+            HadoopConf hadoopConf = new 
HadoopConf(hiveSdLocation.replace(path, StringUtils.EMPTY));
+            Configuration configuration = 
loadHiveBaseHadoopConfig(readonlyConfig);
+            Map<String, String> propsInConfiguration =
+                    configuration.getPropsWithPrefix(StringUtils.EMPTY);
+            hadoopConf.setExtraOptions(propsInConfiguration);
+            return hadoopConf;
+        } catch (URISyntaxException e) {
+            String errorMsg =
+                    String.format(
+                            "Get hdfs namenode host from table location [%s] 
failed,"
+                                    + "please check it",
+                            hiveSdLocation);
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED, 
errorMsg, e);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorage.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorage.java
new file mode 100644
index 0000000000..eb3ce67851
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssHadoopConf;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public class OSSStorage extends AbstractStorage {
+
+    @Override
+    public HadoopConf buildHadoopConfWithReadOnlyConfig(ReadonlyConfig 
readonlyConfig) {
+        Configuration configuration = loadHiveBaseHadoopConfig(readonlyConfig);
+        Config config = fillBucket(readonlyConfig, configuration);
+        HadoopConf hadoopConf = 
OssHadoopConf.buildWithConfig(ReadonlyConfig.fromConfig(config));
+        Map<String, String> propsInConfiguration =
+                configuration.getPropsWithPrefix(StringUtils.EMPTY);
+        hadoopConf.setExtraOptions(propsInConfiguration);
+        return hadoopConf;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3Storage.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3Storage.java
new file mode 100644
index 0000000000..877d14dfc4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3Storage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOnS3Conf;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+public class S3Storage extends AbstractStorage {
+
+    @Override
+    public HadoopConf buildHadoopConfWithReadOnlyConfig(ReadonlyConfig 
readonlyConfig) {
+        Configuration configuration = loadHiveBaseHadoopConfig(readonlyConfig);
+        Config config = fillBucket(readonlyConfig, configuration);
+        config =
+                config.withValue(
+                        S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
+                        ConfigValueFactory.fromAnyRef(
+                                configuration.get(
+                                        
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key())));
+        config =
+                config.withValue(
+                        S3ConfigOptions.FS_S3A_ENDPOINT.key(),
+                        ConfigValueFactory.fromAnyRef(
+                                
configuration.get(S3ConfigOptions.FS_S3A_ENDPOINT.key())));
+        HadoopConf hadoopConf =
+                
HiveOnS3Conf.buildWithReadOnlyConfig(ReadonlyConfig.fromConfig(config));
+        Map<String, String> propsWithPrefix = 
configuration.getPropsWithPrefix(StringUtils.EMPTY);
+        hadoopConf.setExtraOptions(propsWithPrefix);
+        return hadoopConf;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/Storage.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/Storage.java
new file mode 100644
index 0000000000..24df3e76d5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/Storage.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+public interface Storage {
+    HadoopConf buildHadoopConfWithReadOnlyConfig(ReadonlyConfig 
readonlyConfig);
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactory.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactory.java
new file mode 100644
index 0000000000..8a08f2be97
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+public class StorageFactory {
+    public static Storage getStorageType(String hiveSdLocation) {
+        if (hiveSdLocation.startsWith(StorageType.S3.name().toLowerCase())) {
+            return new S3Storage();
+        } else if 
(hiveSdLocation.startsWith(StorageType.OSS.name().toLowerCase())) {
+            return new OSSStorage();
+        } else if 
(hiveSdLocation.startsWith(StorageType.COS.name().toLowerCase())) {
+            return new COSStorage();
+        } else {
+            return new HDFSStorage(hiveSdLocation);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageType.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageType.java
new file mode 100644
index 0000000000..49a7601977
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageType.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+public enum StorageType {
+    S3,
+    OSS,
+    COS,
+    HDFS
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/CosStorageTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/CosStorageTest.java
new file mode 100644
index 0000000000..a8f15b4b91
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/CosStorageTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConfigOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.HashMap;
+
+public class CosStorageTest {
+
+    private static final ReadonlyConfig COS =
+            ReadonlyConfig.fromMap(
+                    new HashMap<String, Object>() {
+                        {
+                            put(
+                                    "hive.hadoop.conf",
+                                    new HashMap<String, String>() {
+                                        {
+                                            put("bucket", "cosn://my_bucket");
+                                            
put(CosConfigOptions.SECRET_ID.key(), "test");
+                                            
put(CosConfigOptions.SECRET_KEY.key(), "test");
+                                            put(CosConfigOptions.REGION.key(), 
"ap-shanghai");
+                                        }
+                                    });
+                        }
+                    });
+
+    @Test
+    void fillBucketInHadoopConf() {
+        COSStorage cosStorage = new COSStorage();
+        HadoopConf cosnConf = 
cosStorage.buildHadoopConfWithReadOnlyConfig(COS);
+        assertHadoopConf(cosnConf);
+    }
+
+    @Test
+    void fillBucketInHadoopConfPath() throws URISyntaxException {
+        URL resource = CosStorageTest.class.getResource("/cos");
+        String filePath = Paths.get(resource.toURI()).toString();
+        HashMap<String, Object> map = new HashMap<>();
+        map.put("hive.hadoop.conf-path", filePath);
+        map.putAll(COS.toMap());
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(map);
+        COSStorage cosStorage = new COSStorage();
+        HadoopConf hadoopConf = 
cosStorage.buildHadoopConfWithReadOnlyConfig(readonlyConfig);
+        assertHadoopConf(hadoopConf);
+    }
+
+    private static void assertHadoopConf(HadoopConf cosnConf) {
+        Assertions.assertTrue(cosnConf instanceof CosConf);
+        Assertions.assertEquals(cosnConf.getSchema(), "cosn");
+        Assertions.assertEquals(cosnConf.getFsHdfsImpl(), 
"org.apache.hadoop.fs.CosFileSystem");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorageTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorageTest.java
new file mode 100644
index 0000000000..da94a24537
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/HDFSStorageTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+
+public class HDFSStorageTest {
+
+    @Test
+    void fillbuildHadoopConfWithReadOnlyConfig() {
+        HDFSStorage hdfsStorage = new HDFSStorage("hdfs://tmp/test");
+        HadoopConf hadoopConf =
+                hdfsStorage.buildHadoopConfWithReadOnlyConfig(
+                        ReadonlyConfig.fromMap(new HashMap<>(0)));
+        Assertions.assertEquals(hadoopConf.getSchema(), "hdfs");
+        Assertions.assertEquals(
+                hadoopConf.getFsHdfsImpl(), 
"org.apache.hadoop.hdfs.DistributedFileSystem");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorageTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorageTest.java
new file mode 100644
index 0000000000..488fd33f98
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/OSSStorageTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssHadoopConf;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.HashMap;
+
+public class OSSStorageTest {
+
+    private static final ReadonlyConfig OSS =
+            ReadonlyConfig.fromMap(
+                    new HashMap<String, Object>() {
+                        {
+                            put(
+                                    "hive.hadoop.conf",
+                                    new HashMap<String, String>() {
+                                        {
+                                            put("bucket", "oss://my_bucket");
+                                        }
+                                    });
+                        }
+                    });
+
+    @Test
+    void fillBucketInHadoopConf() {
+        OSSStorage ossStorage = new OSSStorage();
+        HadoopConf ossnConf = 
ossStorage.buildHadoopConfWithReadOnlyConfig(OSS);
+        assertHadoopConf(ossnConf);
+    }
+
+    @Test
+    void fillBucketInHadoopConfPath() throws URISyntaxException {
+        URL resource = OSSStorageTest.class.getResource("/oss");
+        String filePath = Paths.get(resource.toURI()).toString();
+        HashMap<String, Object> map = new HashMap<>();
+        map.put("hive.hadoop.conf-path", filePath);
+        map.putAll(OSS.toMap());
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(map);
+        OSSStorage ossStorage = new OSSStorage();
+        HadoopConf hadoopConf = 
ossStorage.buildHadoopConfWithReadOnlyConfig(readonlyConfig);
+        assertHadoopConf(hadoopConf);
+    }
+
+    private void assertHadoopConf(HadoopConf ossnConf) {
+        Assertions.assertTrue(ossnConf instanceof OssHadoopConf);
+        Assertions.assertEquals(ossnConf.getSchema(), "oss");
+        Assertions.assertEquals(
+                ossnConf.getFsHdfsImpl(), 
"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3StorageTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3StorageTest.java
new file mode 100644
index 0000000000..52edf0fb4d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/S3StorageTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3ConfigOptions;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOnS3Conf;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.HashMap;
+
+public class S3StorageTest {
+
+    private static final ReadonlyConfig S3A =
+            ReadonlyConfig.fromMap(
+                    new HashMap<String, Object>() {
+                        {
+                            put(
+                                    "hive.hadoop.conf",
+                                    new HashMap<String, String>() {
+                                        {
+                                            
put(S3ConfigOptions.S3_BUCKET.key(), "s3a://my_bucket");
+                                            put(
+                                                    
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER
+                                                            .key(),
+                                                    "provider");
+                                            put(
+                                                    
S3ConfigOptions.FS_S3A_ENDPOINT.key(),
+                                                    
"http://s3.ap-northeast-1.amazonaws.com";);
+                                        }
+                                    });
+                        }
+                    });
+
+    private static final ReadonlyConfig S3 =
+            ReadonlyConfig.fromMap(
+                    new HashMap<String, Object>() {
+                        {
+                            put(
+                                    "hive.hadoop.conf",
+                                    new HashMap<String, String>() {
+                                        {
+                                            
put(S3ConfigOptions.S3_BUCKET.key(), "s3://my_bucket");
+                                            put(
+                                                    
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER
+                                                            .key(),
+                                                    "testProvider");
+                                            
put(S3ConfigOptions.FS_S3A_ENDPOINT.key(), "test");
+                                        }
+                                    });
+                        }
+                    });
+
+    @Test
+    void fillBucketInHadoopConf() {
+        S3Storage s3Storage = new S3Storage();
+        HadoopConf s3aConf = s3Storage.buildHadoopConfWithReadOnlyConfig(S3A);
+        assertHadoopConfForS3a(s3aConf);
+
+        HadoopConf s3Conf = s3Storage.buildHadoopConfWithReadOnlyConfig(S3);
+        Assertions.assertTrue(s3Conf instanceof HiveOnS3Conf);
+        Assertions.assertEquals(s3Conf.getSchema(), "s3");
+        Assertions.assertEquals(
+                s3Conf.getFsHdfsImpl(), 
"com.amazon.ws.emr.hadoop.fs.EmrFileSystem");
+    }
+
+    @Test
+    void fillBucketInHadoopConfPath() throws URISyntaxException {
+        URL resource = S3StorageTest.class.getResource("/s3");
+        String filePath = Paths.get(resource.toURI()).toString();
+        HashMap<String, Object> map = new HashMap<>();
+        map.put("hive.hadoop.conf-path", filePath);
+        map.putAll(S3A.toMap());
+        ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(map);
+        S3Storage s3Storage = new S3Storage();
+        HadoopConf hadoopConf = 
s3Storage.buildHadoopConfWithReadOnlyConfig(readonlyConfig);
+        assertHadoopConfForS3a(hadoopConf);
+    }
+
+    private void assertHadoopConfForS3a(HadoopConf s3aConf) {
+        Assertions.assertTrue(s3aConf instanceof HiveOnS3Conf);
+        Assertions.assertEquals(s3aConf.getSchema(), "s3a");
+        Assertions.assertEquals(s3aConf.getFsHdfsImpl(), 
"org.apache.hadoop.fs.s3a.S3AFileSystem");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactoryTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactoryTest.java
new file mode 100644
index 0000000000..cd4d99bfaa
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/storage/StorageFactoryTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.storage;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class StorageFactoryTest {
+
+    private static final Map<String, Class<? extends Storage>> STORAGE_MAP =
+            new HashMap() {
+                {
+                    put("hdfs://path/to/", HDFSStorage.class);
+                    put("s3n://path/to/", S3Storage.class);
+                    
put("s3://ws-package/hive/test_hive.db/test_hive_sink_on_s3", S3Storage.class);
+                    put("s3a://path/to/", S3Storage.class);
+                    put("oss://path/to/", OSSStorage.class);
+                    put("cosn://path/to/", COSStorage.class);
+                }
+            };
+
+    @Test
+    void testStorageType() {
+        STORAGE_MAP
+                .entrySet()
+                .forEach(
+                        storageMapEntry -> {
+                            Class<? extends Storage> expectedStorageClass =
+                                    storageMapEntry.getValue();
+                            Storage storage =
+                                    
StorageFactory.getStorageType(storageMapEntry.getKey());
+                            Assertions.assertNotNull(storage);
+                            
Assertions.assertTrue(expectedStorageClass.isInstance(storage));
+                        });
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/resources/cos/core-site.xml 
b/seatunnel-connectors-v2/connector-hive/src/test/resources/cos/core-site.xml
new file mode 100644
index 0000000000..e8cd95dfd3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/resources/cos/core-site.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>cosn://mybucket</value>
+    </property>
+    <property>
+        <name>fs.cosn.impl</name>
+        <value>org.apache.hadoop.fs.CosNFileSystem</value>
+    </property>
+    <property>
+        <name>fs.AbstractFileSystem.cosn.impl</name>
+        <value>org.apache.hadoop.fs.CosN</value>
+    </property>
+    <property>
+        <name>fs.cosn.credentials.provider</name>
+        <value>org.apache.hadoop.fs.auth.SimpleCredentialProvider</value>
+    </property>
+    <property>
+        <name>secret_id</name>
+        <value>your-cosn-secret_id</value>
+    </property>
+    <property>
+        <name>secret_key</name>
+        <value>your-secret_key</value>
+    </property>
+    <property>
+        <name>region</name>
+        <value>your-region</value>
+    </property>
+</configuration>
+
+
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/resources/oss/core-site.xml 
b/seatunnel-connectors-v2/connector-hive/src/test/resources/oss/core-site.xml
new file mode 100644
index 0000000000..4ac8474938
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/resources/oss/core-site.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>oss://mybucket</value>
+    </property>
+    <property>
+        <name>fs.oss.accessKeyId</name>
+        <value>your-access-key-id</value>
+    </property>
+    <property>
+        <name>fs.oss.accessKeySecret</name>
+        <value>your-access-key-secret</value>
+    </property>
+</configuration>
+
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/resources/s3/core-site.xml 
b/seatunnel-connectors-v2/connector-hive/src/test/resources/s3/core-site.xml
new file mode 100644
index 0000000000..34a28151c2
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-hive/src/test/resources/s3/core-site.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<configuration>
+   <property>
+    <name>fs.defaultFS</name>
+    <value>s3a://mybucket</value>
+  </property>
+  <property>
+    <name>fs.s3a.access.key</name>
+    <value>*******</value>
+  </property>
+  <property>
+    <name>fs.s3a.secret.key</name>
+    <value>*******</value>
+  </property>
+  <property>
+    <name>fs.s3a.connection.ssl.enabled</name>
+    <value>false</value>
+  </property>
+  <property>
+    <name>fs.s3a.path.style.access</name>
+    <value>true</value>
+  </property>
+   <property>
+    <name>fs.s3a.endpoint</name>
+    <value>http://s3.ap-northeast-1.amazonaws.com</value>
+  </property>
+  <property>
+    <name>fs.s3a.impl</name>
+    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
+  </property>
+  <property>
+      <name>hadoop.tmp.dir</name>
+      <value>/hadoop/tmp</value>
+    <description>A base for other temporary directories.</description>
+  </property>
+</configuration>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/pom.xml
new file mode 100644
index 0000000000..bd9a112b6a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-hive-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Hive</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-hive</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
+            <version>${project.version}</version>
+            <classifier>optional</classifier>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
new file mode 100644
index 0000000000..fbe0b2503b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hive;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.utility.MountableFile;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.IOException;
+
+@DisabledOnContainer(
+        value = {},
+        type = {EngineType.SPARK, EngineType.FLINK})
+@Disabled(
+        "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this 
test, please set up your own environment in the test case file, 
hadoop_hive_conf_path_local and ip below}")
+@Slf4j
+public class HiveIT extends TestSuiteBase implements TestResource {
+    private static final String HADOOP_HIVE_CONF_PATH_LOCAL =
+            "/Users/dailai/software/hadoop-3.3.3/etc/hadoop";
+    private static final String HADOOP_HIVE_CONF_PATH_IN_CONTAINER = 
"/tmp/hadoop";
+
+    private String hiveExeUrl() {
+        return 
"https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar";;
+    }
+
+    private String hadoopAwsUrl() {
+        return 
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar";;
+    }
+
+    private String aliyunSdkOssUrl() {
+        return 
"https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.4.1/aliyun-sdk-oss-3.4.1.jar";;
+    }
+
+    private String jdomUrl() {
+        return "https://repo1.maven.org/maven2/org/jdom/jdom/1.1/jdom-1.1.jar";;
+    }
+
+    private String hadoopAliyunUrl() {
+        return 
"https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.1.4/hadoop-aliyun-3.1.4.jar";;
+    }
+
+    private String hadoopCosUrl() {
+        return 
"https://repo1.maven.org/maven2/com/qcloud/cos/hadoop-cos/2.6.5-8.0.2/hadoop-cos-2.6.5-8.0.2.jar";;
+    }
+
+    @TestContainerExtension
+    protected final ContainerExtendedFactory extendedFactory =
+            container -> {
+                container.execInContainer("sh", "-c", "chmod -R 777 
/etc/hosts");
+                container.execInContainer("sh", "-c", "echo \"${IP01} 
hadoop01\" >> /etc/hosts");
+                container.execInContainer("sh", "-c", "echo \"${IP02} 
hadoop02\" >> /etc/hosts");
+                container.execInContainer("sh", "-c", "echo \"${IP03} 
hadoop03\" >> /etc/hosts");
+                container.execInContainer("sh", "-c", "echo \"${IP04} 
hadoop04\" >> /etc/hosts");
+                container.execInContainer("sh", "-c", "echo \"${IP05} 
hadoop05\" >> /etc/hosts");
+                container.execInContainer("sh", "-c", "echo \"${IP06} 
hadoop06\" >> /etc/hosts");
+                Assertions.assertTrue(
+                        new File(HADOOP_HIVE_CONF_PATH_LOCAL).exists(),
+                        HADOOP_HIVE_CONF_PATH_LOCAL + " must exist");
+                container.execInContainer(
+                        "sh", "-c", "mkdir -p " + 
HADOOP_HIVE_CONF_PATH_IN_CONTAINER);
+                container.execInContainer(
+                        "sh", "-c", "chmod -R 777 " + 
HADOOP_HIVE_CONF_PATH_IN_CONTAINER);
+                // Copy local hadoop conf and hive conf to the container
+                container.copyFileToContainer(
+                        MountableFile.forHostPath(HADOOP_HIVE_CONF_PATH_LOCAL),
+                        HADOOP_HIVE_CONF_PATH_IN_CONTAINER);
+
+                // The jar of hive-exec
+                Container.ExecResult extraCommands =
+                        container.execInContainer(
+                                "sh",
+                                "-c",
+                                "mkdir -p /tmp/seatunnel/plugins/Hive/lib && 
cd /tmp/seatunnel/plugins/Hive/lib && wget "
+                                        + hiveExeUrl());
+                Assertions.assertEquals(0, extraCommands.getExitCode(), 
extraCommands.getStderr());
+                // The jar of s3
+                Container.ExecResult downloadS3Commands =
+                        container.execInContainer(
+                                "sh",
+                                "-c",
+                                "cd /tmp/seatunnel/plugins/Hive/lib && wget " 
+ hadoopAwsUrl());
+                Assertions.assertEquals(
+                        0, downloadS3Commands.getExitCode(), 
downloadS3Commands.getStderr());
+                // The jar of oss
+                Container.ExecResult downloadOssCommands =
+                        container.execInContainer(
+                                "sh",
+                                "-c",
+                                "cd /tmp/seatunnel/plugins/Hive/lib && wget "
+                                        + aliyunSdkOssUrl()
+                                        + " && wget "
+                                        + jdomUrl()
+                                        + " && wget "
+                                        + hadoopAliyunUrl());
+                Assertions.assertEquals(
+                        0, downloadOssCommands.getExitCode(), 
downloadOssCommands.getStderr());
+                // The jar of cos
+                Container.ExecResult downloadCosCommands =
+                        container.execInContainer(
+                                "sh",
+                                "-c",
+                                "cd /tmp/seatunnel/plugins/Hive/lib && wget " 
+ hadoopCosUrl());
+                Assertions.assertEquals(
+                        0, downloadCosCommands.getExitCode(), 
downloadCosCommands.getStderr());
+            };
+
+    @Override
+    public void startUp() throws Exception {}
+
+    @Override
+    public void tearDown() throws Exception {}
+
+    private void executeJob(TestContainer container, String job1, String job2)
+            throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob(job1);
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        Container.ExecResult readResult = container.executeJob(job2);
+        Assertions.assertEquals(0, readResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testFakeSinkHiveOnHDFS(TestContainer container) throws 
Exception {
+        executeJob(container, "/fake_to_hive_on_hdfs.conf", 
"/hive_on_hdfs_to_assert.conf");
+    }
+
+    @TestTemplate
+    public void testFakeSinkHiveOnS3(TestContainer container) throws Exception 
{
+        executeJob(container, "/fake_to_hive_on_s3.conf", 
"/hive_on_s3_to_assert.conf");
+    }
+
+    @TestTemplate
+    public void testFakeSinkHiveOnOSS(TestContainer container) throws 
Exception {
+        executeJob(container, "/fake_to_hive_on_oss.conf", 
"/hive_on_oss_to_assert.conf");
+    }
+
+    @TestTemplate
+    public void testFakeSinkHiveOnCos(TestContainer container) throws 
Exception {
+        executeJob(container, "/fake_to_hive_on_cos.conf", 
"/hive_on_cos_to_assert.conf");
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_cos.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_cos.conf
new file mode 100644
index 0000000000..4e7cc1468b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_cos.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_cos"
+    metastore_uri = "thrift://hadoop04:9083"
+    hive.hadoop.conf-path = "/tmp/hadoop"
+    hive.hadoop.conf = {
+        bucket="cosn://emr-cosn.com"
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs.conf
new file mode 100644
index 0000000000..db0e9cdc6c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_hdfs"
+    metastore_uri = "thrift://hadoop04:9083"
+    hive.hadoop.conf-path = "/tmp/hadoop"
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_oss.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_oss.conf
new file mode 100644
index 0000000000..a3780e0c07
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_oss.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_oss"
+    metastore_uri = 
"thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
+    hive.hadoop.conf-path = "/tmp/hadoop"
+    hive.hadoop.conf = {
+        bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_s3.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_s3.conf
new file mode 100644
index 0000000000..963c680394
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_s3.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_s3"
+    metastore_uri = 
"thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
+    hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
+    hive.hadoop.conf = {
+       bucket="s3://ws-package"
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_cos_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_cos_to_assert.conf
new file mode 100644
index 0000000000..bc43786d5a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_cos_to_assert.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_cos"
+    metastore_uri = "thrift://hadoop04:9083"
+    hive.hadoop.conf-path = "/tmp/hadoop"
+    hive.hadoop.conf = {
+        bucket="cosn://emr-cosn.com"
+    }
+    result_table_name = hive_source
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = hive_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 3
+        }
+      ],
+      field_rules = [
+        {
+          field_name = pk_id
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = score
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert.conf
new file mode 100644
index 0000000000..87df421a2d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert.conf
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_hdfs"
+    metastore_uri = "thrift://hadoop04:9083"
+    hive.hadoop.conf-path = "/tmp/hadoop"
+    result_table_name = hive_source
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = hive_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 3
+        }
+      ],
+      field_rules = [
+        {
+          field_name = pk_id
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = score
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_oss_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_oss_to_assert.conf
new file mode 100644
index 0000000000..4968a825c2
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_oss_to_assert.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_oss"
+    metastore_uri = 
"thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
+    hive.hadoop.conf-path = "/tmp/hadoop"
+    hive.hadoop.conf = {
+        bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
+    }
+    result_table_name = hive_source
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = hive_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 3
+        }
+      ],
+      field_rules = [
+        {
+          field_name = pk_id
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = score
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_s3_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_s3_to_assert.conf
new file mode 100644
index 0000000000..e6e5b424e7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_s3_to_assert.conf
@@ -0,0 +1,76 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hive {
+    table_name = "test_hive.test_hive_sink_on_s3"
+    metastore_uri = 
"thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
+    hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
+    hive.hadoop.conf = {
+       bucket="s3://ws-package"
+    }
+    result_table_name = hive_source
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = hive_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 3
+        }
+      ],
+      field_rules = [
+        {
+          field_name = pk_id
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = score
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 459c49a4d0..45c78dbf70 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -67,6 +67,7 @@
         <module>connector-easysearch-e2e</module>
         <module>connector-cdc-postgres-e2e</module>
         <module>connector-cdc-oracle-e2e</module>
+        <module>connector-hive-e2e</module>
     </modules>
 
     <dependencies>
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 1f59f302f8..37af5011ef 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -352,7 +352,11 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
                 || s.contains(
                         
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner")
                 || s.startsWith("Log4j2-TF-")
-                || aqsThread.matcher(s).matches();
+                || aqsThread.matcher(s).matches()
+                // The renewed background thread of the hdfs client
+                || s.startsWith("LeaseRenewer")
+                // The read of hdfs which has the thread that is all in 
running status
+                || s.startsWith("org.apache.hadoop.hdfs.PeerCache");
     }
 
     private void classLoaderObjectCheck(Integer maxSize) throws IOException, 
InterruptedException {

Reply via email to