This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4d9287fce4 [Feature] Hive Source/Sink support multiple table (#5929)
4d9287fce4 is described below

commit 4d9287fce4943a30e31e5dbda013c1cee17034ab
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Apr 30 14:41:44 2024 +0800

    [Feature] Hive Source/Sink support multiple table (#5929)
---
 docs/en/connector-v2/sink/Hive.md                  |  52 +++-
 docs/en/connector-v2/source/Hive.md                |  25 +-
 .../api/table/catalog/CatalogTableUtil.java        |  39 +++
 .../hive/commit/HiveSinkAggregatedCommitter.java   |  22 +-
 .../seatunnel/hive/config/BaseHiveOptions.java     |  43 +++
 .../seatunnel/hive/config/HiveConfig.java          |  31 ---
 .../seatunnel/hive/config/HiveConstants.java       |  36 +++
 .../hive/exception/HiveConnectorErrorCode.java     |   4 +-
 .../connectors/seatunnel/hive/sink/HiveSink.java   | 298 +++++++++++---------
 .../seatunnel/hive/sink/HiveSinkFactory.java       |  85 +++++-
 .../{HiveSinkFactory.java => HiveSinkOptions.java} |  32 +--
 .../hive/sink/writter/HiveSinkWriter.java          |  51 ++++
 .../seatunnel/hive/source/HiveSource.java          | 238 ++++------------
 .../seatunnel/hive/source/HiveSourceFactory.java   |  23 +-
 .../hive/source/config/HiveSourceConfig.java       | 299 +++++++++++++++++++++
 .../hive/source/config/HiveSourceOptions.java      |  36 +++
 .../config/MultipleTableHiveSourceConfig.java      |  55 ++++
 .../reader/MultipleTableHiveSourceReader.java      | 129 +++++++++
 .../hive/source/split/HiveSourceSplit.java         |  40 +++
 .../MultipleTableHiveSourceSplitEnumerator.java    | 162 +++++++++++
 .../hive/source/state/HiveSourceState.java         |  38 +++
 .../seatunnel/hive/utils/HiveMetaStoreProxy.java   |  42 ++-
 .../hive/utils/HiveMetaStoreProxyUtils.java        |  13 +-
 .../seatunnel/hive/utils/HiveTableUtils.java       |  63 +++++
 .../seatunnel/hive/utils/HiveTypeConvertor.java    |  59 ++++
 .../seatunnel/hive/source/TypeConvertTest.java     |  51 ----
 .../hive/utils/HiveMetaStoreProxyUtilsTest.java    |   9 +-
 .../hive/utils/HiveTypeConvertorTest.java          |  54 ++++
 .../connector/ConnectorSpecificationCheckTest.java |   1 +
 .../seatunnel-engine-examples/pom.xml              |  50 ++++
 30 files changed, 1590 insertions(+), 490 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hive.md 
b/docs/en/connector-v2/sink/Hive.md
index 245cd5b271..dac4a814c2 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -47,7 +47,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 
 ### table_name [string]
 
-Target Hive table name eg: db1.table1
+Target Hive table name eg: db1.table1, and if the source is multiple mode, you 
can use `${database_name}.${table_name}` to generate the table name, it will 
replace the `${database_name}` and `${table_name}` with the value of the 
CatalogTable generate from the source.
 
 ### metastore_uri [string]
 
@@ -343,6 +343,56 @@ sink {
 }
 ```
 
+### example 2
+
+We have multiple source table like this:
+
+```bash
+create table test_1(
+)
+PARTITIONED BY (xx);
+
+create table test_2(
+)
+PARTITIONED BY (xx);
+...
+```
+
+We need read data from these source tables and write to another tables:
+
+The job config file can like this:
+
+```
+env {
+  # You can set flink configuration here
+  parallelism = 3
+  job.name="test_hive_source_to_hive"
+}
+
+source {
+  Hive {
+    tables_configs = [
+      {
+        table_name = "test_hive.test_1"
+        metastore_uri = "thrift://ctyun6:9083"
+      },
+      {
+        table_name = "test_hive.test_2"
+        metastore_uri = "thrift://ctyun7:9083"
+      }
+    ]
+  }
+}
+
+sink {
+  # choose stdout output plugin to output data to console
+  Hive {
+    table_name = "${database_name}.${table_name}"
+    metastore_uri = "thrift://ctyun7:9083"
+  }
+}
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git a/docs/en/connector-v2/source/Hive.md 
b/docs/en/connector-v2/source/Hive.md
index 5273ca2ee5..bb7b851409 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -59,10 +59,6 @@ Hive metastore uri
 
 The path of `hdfs-site.xml`, used to load ha configuration of namenodes
 
-### hive_site_path [string]
-
-The path of `hive-site.xml`, used to authentication hive metastore
-
 ### read_partitions [list]
 
 The target partitions that user want to read from hive table, if user does not 
set this parameter, it will read all the data from hive table.
@@ -102,6 +98,8 @@ Source plugin common parameters, please refer to [Source 
Common Options](common-
 
 ## Example
 
+### Example 1: Single table
+
 ```bash
 
   Hive {
@@ -111,6 +109,25 @@ Source plugin common parameters, please refer to [Source 
Common Options](common-
 
 ```
 
+### Example 2: Multiple tables
+
+```bash
+
+  Hive {
+    tables_configs = [
+        {
+          table_name = "default.seatunnel_orc_1"
+          metastore_uri = "thrift://namenode001:9083"
+        },
+        {
+          table_name = "default.seatunnel_orc_2"
+          metastore_uri = "thrift://namenode001:9083"
+        }
+    ]
+  }
+
+```
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index b268fe612e..eafaedf05d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -42,6 +42,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /** Utils contains some common methods for construct CatalogTable. */
 @Slf4j
@@ -234,4 +236,41 @@ public class CatalogTableUtil implements Serializable {
     public static CatalogTable buildSimpleTextTable() {
         return getCatalogTable("default", buildSimpleTextSchema());
     }
+
+    public static CatalogTable newCatalogTable(
+            CatalogTable catalogTable, SeaTunnelRowType seaTunnelRowType) {
+        TableSchema tableSchema = catalogTable.getTableSchema();
+
+        Map<String, Column> columnMap =
+                tableSchema.getColumns().stream()
+                        .collect(Collectors.toMap(Column::getName, 
Function.identity()));
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+
+        List<Column> finalColumns = new ArrayList<>();
+        for (int i = 0; i < fieldNames.length; i++) {
+            Column column = columnMap.get(fieldNames[i]);
+            if (column != null) {
+                finalColumns.add(column);
+            } else {
+                finalColumns.add(
+                        PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, 
false, null, null));
+            }
+        }
+
+        TableSchema finalSchema =
+                TableSchema.builder()
+                        .columns(finalColumns)
+                        .primaryKey(tableSchema.getPrimaryKey())
+                        .constraintKey(tableSchema.getConstraintKeys())
+                        .build();
+
+        return CatalogTable.of(
+                catalogTable.getTableId(),
+                finalSchema,
+                catalogTable.getOptions(),
+                catalogTable.getPartitionKeys(),
+                catalogTable.getComment(),
+                catalogTable.getCatalogName());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
index 2f2bc972a0..db4340b373 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java
@@ -17,11 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.commit;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
+import org.apache.seatunnel.connectors.seatunnel.hive.sink.HiveSinkOptions;
 import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
 
 import org.apache.thrift.TException;
@@ -33,33 +33,31 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ABORT_DROP_PARTITION_METADATA;
-
 @Slf4j
 public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
-    private final Config pluginConfig;
     private final String dbName;
     private final String tableName;
     private final boolean abortDropPartitionMetadata;
 
+    private final ReadonlyConfig readonlyConfig;
+
     public HiveSinkAggregatedCommitter(
-            Config pluginConfig, String dbName, String tableName, HadoopConf 
hadoopConf) {
+            ReadonlyConfig readonlyConfig, String dbName, String tableName, 
HadoopConf hadoopConf) {
         super(hadoopConf);
-        this.pluginConfig = pluginConfig;
+        this.readonlyConfig = readonlyConfig;
         this.dbName = dbName;
         this.tableName = tableName;
         this.abortDropPartitionMetadata =
-                pluginConfig.hasPath(ABORT_DROP_PARTITION_METADATA.key())
-                        ? 
pluginConfig.getBoolean(ABORT_DROP_PARTITION_METADATA.key())
-                        : ABORT_DROP_PARTITION_METADATA.defaultValue();
+                
readonlyConfig.get(HiveSinkOptions.ABORT_DROP_PARTITION_METADATA);
     }
 
     @Override
     public List<FileAggregatedCommitInfo> commit(
             List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws 
IOException {
+
         List<FileAggregatedCommitInfo> errorCommitInfos = 
super.commit(aggregatedCommitInfos);
         if (errorCommitInfos.isEmpty()) {
-            HiveMetaStoreProxy hiveMetaStore = 
HiveMetaStoreProxy.getInstance(pluginConfig);
+            HiveMetaStoreProxy hiveMetaStore = 
HiveMetaStoreProxy.getInstance(readonlyConfig);
             try {
                 for (FileAggregatedCommitInfo aggregatedCommitInfo : 
aggregatedCommitInfos) {
                     Map<String, List<String>> partitionDirAndValuesMap =
@@ -87,7 +85,7 @@ public class HiveSinkAggregatedCommitter extends 
FileSinkAggregatedCommitter {
     public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) 
throws Exception {
         super.abort(aggregatedCommitInfos);
         if (abortDropPartitionMetadata) {
-            HiveMetaStoreProxy hiveMetaStore = 
HiveMetaStoreProxy.getInstance(pluginConfig);
+            HiveMetaStoreProxy hiveMetaStore = 
HiveMetaStoreProxy.getInstance(readonlyConfig);
             for (FileAggregatedCommitInfo aggregatedCommitInfo : 
aggregatedCommitInfos) {
                 Map<String, List<String>> partitionDirAndValuesMap =
                         aggregatedCommitInfo.getPartitionDirAndValuesMap();
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
new file mode 100644
index 0000000000..efed4e91c5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/BaseHiveOptions.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
+
+public class BaseHiveOptions extends BaseSourceConfigOptions {
+
+    public static final Option<String> TABLE_NAME =
+            Options.key("table_name")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Hive table name");
+
+    public static final Option<String> METASTORE_URI =
+            Options.key("metastore_uri")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Hive metastore uri");
+
+    public static final Option<String> HIVE_SITE_PATH =
+            Options.key("hive_site_path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The path of hive-site.xml");
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
index 0afadc64d8..714be58619 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
@@ -17,14 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hive.metastore.api.Table;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -66,29 +60,4 @@ public class HiveConfig {
                     .noDefaultValue()
                     .withDescription(
                             "The specified loading path for the 
'core-site.xml', 'hdfs-site.xml' files");
-
-    public static final String TEXT_INPUT_FORMAT_CLASSNAME =
-            "org.apache.hadoop.mapred.TextInputFormat";
-    public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
-            "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
-    public static final String PARQUET_INPUT_FORMAT_CLASSNAME =
-            "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
-    public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME =
-            "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
-    public static final String ORC_INPUT_FORMAT_CLASSNAME =
-            "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
-    public static final String ORC_OUTPUT_FORMAT_CLASSNAME =
-            "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
-
-    public static Pair<String[], Table> getTableInfo(Config config) {
-        String table = config.getString(TABLE_NAME.key());
-        String[] splits = table.split("\\.");
-        if (splits.length != 2) {
-            throw new RuntimeException("Please config " + TABLE_NAME + " as 
db.table format");
-        }
-        HiveMetaStoreProxy hiveMetaStoreProxy = 
HiveMetaStoreProxy.getInstance(config);
-        Table tableInformation = hiveMetaStoreProxy.getTable(splits[0], 
splits[1]);
-        hiveMetaStoreProxy.close();
-        return Pair.of(splits, tableInformation);
-    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConstants.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConstants.java
new file mode 100644
index 0000000000..8539df68a8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConstants.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
+
+public class HiveConstants {
+
+    public static final String CONNECTOR_NAME = "Hive";
+
+    public static final String TEXT_INPUT_FORMAT_CLASSNAME =
+            "org.apache.hadoop.mapred.TextInputFormat";
+    public static final String TEXT_OUTPUT_FORMAT_CLASSNAME =
+            "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
+    public static final String PARQUET_INPUT_FORMAT_CLASSNAME =
+            "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
+    public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME =
+            "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
+    public static final String ORC_INPUT_FORMAT_CLASSNAME =
+            "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
+    public static final String ORC_OUTPUT_FORMAT_CLASSNAME =
+            "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
index 205a9b5dec..a0923acc1b 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HiveConnectorErrorCode.java
@@ -23,7 +23,9 @@ public enum HiveConnectorErrorCode implements 
SeaTunnelErrorCode {
     GET_HDFS_NAMENODE_HOST_FAILED("HIVE-01", "Get name node host from table 
location failed"),
     INITIALIZE_HIVE_METASTORE_CLIENT_FAILED("HIVE-02", "Initialize hive 
metastore client failed"),
     GET_HIVE_TABLE_INFORMATION_FAILED(
-            "HIVE-03", "Get hive table information from hive metastore service 
failed");
+            "HIVE-03", "Get hive table information from hive metastore service 
failed"),
+    HIVE_TABLE_NAME_ERROR("HIVE-04", "Hive table name is invalid"),
+    ;
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 078ce83fd1..b91c65de9b 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -20,110 +20,78 @@ package 
org.apache.seatunnel.connectors.seatunnel.hive.sink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategyFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.commit.HiveSinkAggregatedCommitter;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.sink.writter.HiveSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 
-import com.google.auto.service.AutoService;
-
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_NAME_EXPRESSION;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_PATH;
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.HAVE_PARTITION;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.PARTITION_BY;
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.PARTITION_DIR_EXPRESSION;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.SINK_COLUMNS;
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.METASTORE_URI;
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_OUTPUT_FORMAT_CLASSNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_OUTPUT_FORMAT_CLASSNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TABLE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_OUTPUT_FORMAT_CLASSNAME;
-
-@AutoService(SeaTunnelSink.class)
-public class HiveSink extends BaseHdfsFileSink {
-    private String dbName;
-    private String tableName;
-    private Table tableInformation;
 
-    @Override
-    public String getPluginName() {
-        return "Hive";
+public class HiveSink
+        implements SeaTunnelSink<
+                        SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo>,
+                SupportMultiTableSink {
+
+    // Since Table might contain some unserializable fields, we need to make 
it transient
+    // And use getTableInformation to get the Table object
+    private transient Table tableInformation;
+    private final CatalogTable catalogTable;
+    private final ReadonlyConfig readonlyConfig;
+    private final HadoopConf hadoopConf;
+    private final FileSinkConfig fileSinkConfig;
+    private transient WriteStrategy writeStrategy;
+    private String jobId;
+
+    public HiveSink(ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+        this.readonlyConfig = readonlyConfig;
+        this.catalogTable = catalogTable;
+        this.tableInformation = getTableInformation();
+        this.hadoopConf = createHadoopConf(readonlyConfig);
+        this.fileSinkConfig = generateFileSinkConfig(readonlyConfig, 
catalogTable);
+        this.writeStrategy = getWriteStrategy();
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(pluginConfig, 
METASTORE_URI.key(), TABLE_NAME.key());
-        if (!result.isSuccess()) {
-            throw new HiveConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        result =
-                CheckConfigUtil.checkAtLeastOneExists(
-                        pluginConfig,
-                        FILE_FORMAT_TYPE.key(),
-                        FILE_PATH.key(),
-                        FIELD_DELIMITER.key(),
-                        ROW_DELIMITER.key(),
-                        IS_PARTITION_FIELD_WRITE_IN_FILE.key(),
-                        PARTITION_DIR_EXPRESSION.key(),
-                        HAVE_PARTITION.key(),
-                        SINK_COLUMNS.key(),
-                        PARTITION_BY.key());
-        if (result.isSuccess()) {
-            throw new HiveConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "Hive sink connector does not support these 
setting [%s]",
-                            String.join(
-                                    ",",
-                                    FILE_FORMAT_TYPE.key(),
-                                    FILE_PATH.key(),
-                                    FIELD_DELIMITER.key(),
-                                    ROW_DELIMITER.key(),
-                                    IS_PARTITION_FIELD_WRITE_IN_FILE.key(),
-                                    PARTITION_DIR_EXPRESSION.key(),
-                                    HAVE_PARTITION.key(),
-                                    SINK_COLUMNS.key(),
-                                    PARTITION_BY.key())));
-        }
-        Pair<String[], Table> tableInfo = 
HiveConfig.getTableInfo(pluginConfig);
-        dbName = tableInfo.getLeft()[0];
-        tableName = tableInfo.getLeft()[1];
-        tableInformation = tableInfo.getRight();
+    private FileSinkConfig generateFileSinkConfig(
+            ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+        Table tableInformation = getTableInformation();
+        Config pluginConfig = readonlyConfig.toConfig();
         List<String> sinkFields =
                 tableInformation.getSd().getCols().stream()
                         .map(FieldSchema::getName)
@@ -133,35 +101,42 @@ public class HiveSink extends BaseHdfsFileSink {
                         .map(FieldSchema::getName)
                         .collect(Collectors.toList());
         sinkFields.addAll(partitionKeys);
-        String outputFormat = tableInformation.getSd().getOutputFormat();
-        if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
-            Map<String, String> parameters =
-                    tableInformation.getSd().getSerdeInfo().getParameters();
-            pluginConfig =
-                    pluginConfig
-                            .withValue(
-                                    FILE_FORMAT_TYPE.key(),
-                                    
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
-                            .withValue(
-                                    FIELD_DELIMITER.key(),
-                                    
ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
-                            .withValue(
-                                    ROW_DELIMITER.key(),
-                                    
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
-        } else if (PARQUET_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
-            pluginConfig =
-                    pluginConfig.withValue(
-                            FILE_FORMAT_TYPE.key(),
-                            
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
-        } else if (ORC_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
-            pluginConfig =
-                    pluginConfig.withValue(
-                            FILE_FORMAT_TYPE.key(),
-                            
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
-        } else {
-            throw new HiveConnectorException(
-                    CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
-                    "Hive connector only support [text parquet orc] table 
now");
+
+        FileFormat fileFormat = 
HiveTableUtils.parseFileFormat(tableInformation);
+        switch (fileFormat) {
+            case TEXT:
+                Map<String, String> parameters =
+                        
tableInformation.getSd().getSerdeInfo().getParameters();
+                pluginConfig =
+                        pluginConfig
+                                .withValue(
+                                        FILE_FORMAT_TYPE.key(),
+                                        
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
+                                .withValue(
+                                        FIELD_DELIMITER.key(),
+                                        ConfigValueFactory.fromAnyRef(
+                                                parameters.get("field.delim")))
+                                .withValue(
+                                        ROW_DELIMITER.key(),
+                                        ConfigValueFactory.fromAnyRef(
+                                                parameters.get("line.delim")));
+                break;
+            case PARQUET:
+                pluginConfig =
+                        pluginConfig.withValue(
+                                FILE_FORMAT_TYPE.key(),
+                                
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
+                break;
+            case ORC:
+                pluginConfig =
+                        pluginConfig.withValue(
+                                FILE_FORMAT_TYPE.key(),
+                                
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
+                break;
+            default:
+                throw new HiveConnectorException(
+                        CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                        "Hive connector only support [text parquet orc] table 
now");
         }
         pluginConfig =
                 pluginConfig
@@ -178,42 +153,97 @@ public class HiveSink extends BaseHdfsFileSink {
                         .withValue(SINK_COLUMNS.key(), 
ConfigValueFactory.fromAnyRef(sinkFields))
                         .withValue(
                                 PARTITION_BY.key(), 
ConfigValueFactory.fromAnyRef(partitionKeys));
-        String hiveSdLocation = tableInformation.getSd().getLocation();
-        try {
-            /**
-             * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop 
conf can be
-             * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy 
can obtain the
-             * correct Schema and FsHdfsImpl that can be filled into hadoop 
configuration in {@link
-             * 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
-             */
-            hadoopConf =
-                    StorageFactory.getStorageType(hiveSdLocation)
-                            .buildHadoopConfWithReadOnlyConfig(
-                                    ReadonlyConfig.fromConfig(pluginConfig));
-            String path = new URI(hiveSdLocation).getPath();
-            pluginConfig =
-                    pluginConfig
-                            .withValue(FILE_PATH.key(), 
ConfigValueFactory.fromAnyRef(path))
-                            .withValue(
-                                    FS_DEFAULT_NAME_KEY,
-                                    
ConfigValueFactory.fromAnyRef(hadoopConf.getHdfsNameKey()));
-        } catch (URISyntaxException e) {
-            String errorMsg =
-                    String.format(
-                            "Get hdfs namenode host from table location [%s] 
failed,"
-                                    + "please check it",
-                            hiveSdLocation);
-            throw new HiveConnectorException(
-                    HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED, 
errorMsg, e);
-        }
-        this.pluginConfig = pluginConfig;
-        super.prepare(pluginConfig);
+
+        return new FileSinkConfig(pluginConfig, 
catalogTable.getSeaTunnelRowType());
+    }
+
+    @Override
+    public String getPluginName() {
+        return HiveConstants.CONNECTOR_NAME;
     }
 
     @Override
     public Optional<SinkAggregatedCommitter<FileCommitInfo, 
FileAggregatedCommitInfo>>
             createAggregatedCommitter() {
         return Optional.of(
-                new HiveSinkAggregatedCommitter(pluginConfig, dbName, 
tableName, hadoopConf));
+                new HiveSinkAggregatedCommitter(
+                        readonlyConfig,
+                        getTableInformation().getDbName(),
+                        getTableInformation().getTableName(),
+                        hadoopConf));
+    }
+
+    @Override
+    public void setJobContext(JobContext jobContext) {
+        this.jobId = jobContext.getJobId();
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
restoreWriter(
+            SinkWriter.Context context, List<FileSinkState> states) {
+        return new HiveSinkWriter(getWriteStrategy(), hadoopConf, context, 
jobId, states);
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
createWriter(
+            SinkWriter.Context context) {
+        return new HiveSinkWriter(getWriteStrategy(), hadoopConf, context, 
jobId);
+    }
+
+    @Override
+    public Optional<Serializer<FileCommitInfo>> getCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<Serializer<FileAggregatedCommitInfo>> 
getAggregatedCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<Serializer<FileSinkState>> getWriterStateSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    private HadoopConf createHadoopConf(ReadonlyConfig readonlyConfig) {
+        String hdfsLocation = getTableInformation().getSd().getLocation();
+
+        /**
+         * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop 
conf can be
+         * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy can 
obtain the correct
+         * Schema and FsHdfsImpl that can be filled into hadoop configuration 
in {@link
+         * 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
+         */
+        HadoopConf hadoopConf =
+                StorageFactory.getStorageType(hdfsLocation)
+                        .buildHadoopConfWithReadOnlyConfig(readonlyConfig);
+        readonlyConfig
+                .getOptional(HiveSourceOptions.HDFS_SITE_PATH)
+                .ifPresent(hadoopConf::setHdfsSitePath);
+        readonlyConfig
+                .getOptional(HiveSourceOptions.REMOTE_USER)
+                .ifPresent(hadoopConf::setRemoteUser);
+        readonlyConfig
+                .getOptional(HiveSourceOptions.KERBEROS_PRINCIPAL)
+                .ifPresent(hadoopConf::setKerberosPrincipal);
+        readonlyConfig
+                .getOptional(HiveSourceOptions.KERBEROS_KEYTAB_PATH)
+                .ifPresent(hadoopConf::setKerberosKeytabPath);
+        return hadoopConf;
+    }
+
+    private Table getTableInformation() {
+        if (tableInformation == null) {
+            tableInformation = HiveTableUtils.getTableInfo(readonlyConfig);
+        }
+        return tableInformation;
+    }
+
+    private WriteStrategy getWriteStrategy() {
+        if (writeStrategy == null) {
+            writeStrategy = 
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
+            
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+        }
+        return writeStrategy;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
index e40864517f..e53aed86fc 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
@@ -17,19 +17,32 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.sink.SinkReplaceNameConstant;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
 
 import com.google.auto.service.AutoService;
 
+import java.util.HashMap;
+import java.util.Map;
+
 @AutoService(Factory.class)
-public class HiveSinkFactory implements TableSinkFactory {
-    @Override
-    public String factoryIdentifier() {
-        return "Hive";
-    }
+public class HiveSinkFactory
+        implements TableSinkFactory<
+                SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo> {
 
     @Override
     public OptionRule optionRule() {
@@ -37,8 +50,70 @@ public class HiveSinkFactory implements TableSinkFactory {
                 .required(HiveConfig.TABLE_NAME)
                 .required(HiveConfig.METASTORE_URI)
                 .optional(HiveConfig.ABORT_DROP_PARTITION_METADATA)
+                .optional(BaseSinkConfig.KERBEROS_PRINCIPAL)
+                .optional(BaseSinkConfig.KERBEROS_KEYTAB_PATH)
+                .optional(BaseSinkConfig.REMOTE_USER)
                 .optional(HiveConfig.HADOOP_CONF)
                 .optional(HiveConfig.HADOOP_CONF_PATH)
                 .build();
     }
+
+    @Override
+    public TableSink<SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo>
+            createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        CatalogTable catalogTable = context.getCatalogTable();
+
+        ReadonlyConfig finalReadonlyConfig =
+                generateCurrentReadonlyConfig(readonlyConfig, catalogTable);
+        return () -> new HiveSink(finalReadonlyConfig, catalogTable);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return HiveConstants.CONNECTOR_NAME;
+    }
+
+    private ReadonlyConfig generateCurrentReadonlyConfig(
+            ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+
+        Map<String, String> configMap = readonlyConfig.toMap();
+
+        readonlyConfig
+                .getOptional(HiveSinkOptions.TABLE_NAME)
+                .ifPresent(
+                        tableName -> {
+                            String replacedPath =
+                                    replaceCatalogTableInPath(tableName, 
catalogTable);
+                            configMap.put(HiveSinkOptions.TABLE_NAME.key(), 
replacedPath);
+                        });
+
+        return ReadonlyConfig.fromMap(new HashMap<>(configMap));
+    }
+
+    private String replaceCatalogTableInPath(String originTableName, 
CatalogTable catalogTable) {
+        String tableName = originTableName;
+        TableIdentifier tableIdentifier = catalogTable.getTableId();
+        if (tableIdentifier != null) {
+            if (tableIdentifier.getDatabaseName() != null) {
+                tableName =
+                        tableName.replace(
+                                
SinkReplaceNameConstant.REPLACE_DATABASE_NAME_KEY,
+                                tableIdentifier.getDatabaseName());
+            }
+            if (tableIdentifier.getSchemaName() != null) {
+                tableName =
+                        tableName.replace(
+                                
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
+                                tableIdentifier.getSchemaName());
+            }
+            if (tableIdentifier.getTableName() != null) {
+                tableName =
+                        tableName.replace(
+                                SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY,
+                                tableIdentifier.getTableName());
+            }
+        }
+        return tableName;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
similarity index 50%
copy from 
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
copy to 
seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
index e40864517f..a241717a44 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkOptions.java
@@ -17,28 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions;
 
-import com.google.auto.service.AutoService;
+public class HiveSinkOptions extends BaseHiveOptions {
 
-@AutoService(Factory.class)
-public class HiveSinkFactory implements TableSinkFactory {
-    @Override
-    public String factoryIdentifier() {
-        return "Hive";
-    }
-
-    @Override
-    public OptionRule optionRule() {
-        return OptionRule.builder()
-                .required(HiveConfig.TABLE_NAME)
-                .required(HiveConfig.METASTORE_URI)
-                .optional(HiveConfig.ABORT_DROP_PARTITION_METADATA)
-                .optional(HiveConfig.HADOOP_CONF)
-                .optional(HiveConfig.HADOOP_CONF_PATH)
-                .build();
-    }
+    public static final Option<Boolean> ABORT_DROP_PARTITION_METADATA =
+            Options.key("abort_drop_partition_metadata")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Flag to decide whether to drop partition metadata 
from Hive Metastore during an abort operation. Note: this only affects the 
metadata in the metastore, the data in the partition will always be 
deleted(data generated during the synchronization process).");
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/writter/HiveSinkWriter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/writter/HiveSinkWriter.java
new file mode 100644
index 0000000000..a00f559082
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/writter/HiveSinkWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.sink.writter;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSinkWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
+import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+public class HiveSinkWriter extends BaseFileSinkWriter
+        implements SupportMultiTableSinkWriter<WriteStrategy> {
+
+    public HiveSinkWriter(
+            WriteStrategy writeStrategy,
+            HadoopConf hadoopConf,
+            Context context,
+            String jobId,
+            List<FileSinkState> fileSinkStates) {
+        // todo: do we need to set writeStrategy as share resource? then how 
to deal with the pre
+        // fileSinkStates?
+        super(writeStrategy, hadoopConf, context, jobId, fileSinkStates);
+    }
+
+    public HiveSinkWriter(
+            WriteStrategy writeStrategy,
+            HadoopConf hadoopConf,
+            SinkWriter.Context context,
+            String jobId) {
+        this(writeStrategy, hadoopConf, context, jobId, 
Collections.emptyList());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index f7642e3611..00f2bd0a8a 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -17,213 +17,67 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.source;
 
-import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SqlType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.common.exception.CommonError;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import org.apache.seatunnel.common.utils.JsonUtils;
-import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.BaseHdfsFileSource;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.reader.MultipleTableHiveSourceReader;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.split.MultipleTableHiveSourceSplitEnumerator;
 
-import com.google.auto.service.AutoService;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions.FILE_FORMAT_TYPE;
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions.FILE_PATH;
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_INPUT_FORMAT_CLASSNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_INPUT_FORMAT_CLASSNAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_INPUT_FORMAT_CLASSNAME;
+import java.util.stream.Collectors;
 
-@AutoService(SeaTunnelSource.class)
 public class HiveSource extends BaseHdfsFileSource {
-    private Table tableInformation;
+
+    private final MultipleTableHiveSourceConfig multipleTableHiveSourceConfig;
+
+    public HiveSource(ReadonlyConfig readonlyConfig) {
+        this.multipleTableHiveSourceConfig = new 
MultipleTableHiveSourceConfig(readonlyConfig);
+    }
 
     @Override
     public String getPluginName() {
-        return "Hive";
+        return HiveConstants.CONNECTOR_NAME;
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig, HiveConfig.METASTORE_URI.key(), 
HiveConfig.TABLE_NAME.key());
-        if (!result.isSuccess()) {
-            throw new HiveConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        result =
-                CheckConfigUtil.checkAtLeastOneExists(
-                        pluginConfig,
-                        TableSchemaOptions.SCHEMA.key(),
-                        FILE_FORMAT_TYPE.key(),
-                        FILE_PATH.key(),
-                        FS_DEFAULT_NAME_KEY);
-        if (result.isSuccess()) {
-            throw new HiveConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "Hive source connector does not support these 
setting [%s]",
-                            String.join(
-                                    ",",
-                                    TableSchemaOptions.SCHEMA.key(),
-                                    FILE_FORMAT_TYPE.key(),
-                                    FILE_PATH.key(),
-                                    FS_DEFAULT_NAME_KEY)));
-        }
-        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.READ_PARTITIONS.key())) {
-            // verify partition list
-            List<String> partitionsList =
-                    
pluginConfig.getStringList(BaseSourceConfigOptions.READ_PARTITIONS.key());
-            if (partitionsList.isEmpty()) {
-                throw new HiveConnectorException(
-                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                        "Partitions list is empty, please check");
-            }
-            int depth = partitionsList.get(0).replaceAll("\\\\", 
"/").split("/").length;
-            long count =
-                    partitionsList.stream()
-                            .map(partition -> partition.replaceAll("\\\\", 
"/").split("/").length)
-                            .filter(length -> length != depth)
-                            .count();
-            if (count > 0) {
-                throw new HiveConnectorException(
-                        SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                        "Every partition that in partition list should has the 
same directory depth");
-            }
-        }
-        Pair<String[], Table> tableInfo = 
HiveConfig.getTableInfo(pluginConfig);
-        tableInformation = tableInfo.getRight();
-        String inputFormat = tableInformation.getSd().getInputFormat();
-        if (TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
-            pluginConfig =
-                    pluginConfig.withValue(
-                            FILE_FORMAT_TYPE.key(),
-                            
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
-            // Build schema from hive table information
-            // Because the entrySet in typesafe config couldn't keep key-value 
order
-            // So use jackson to keep key-value order
-            Map<String, Object> schema = parseSchema(tableInformation);
-            ConfigRenderOptions options = ConfigRenderOptions.concise();
-            String render = pluginConfig.root().render(options);
-            ObjectNode jsonNodes = JsonUtils.parseObject(render);
-            jsonNodes.putPOJO(TableSchemaOptions.SCHEMA.key(), schema);
-            pluginConfig = ConfigFactory.parseString(jsonNodes.toString());
-        } else if (PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
-            pluginConfig =
-                    pluginConfig.withValue(
-                            FILE_FORMAT_TYPE.key(),
-                            
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
-        } else if (ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
-            pluginConfig =
-                    pluginConfig.withValue(
-                            FILE_FORMAT_TYPE.key(),
-                            
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
-        } else {
-            throw new HiveConnectorException(
-                    CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
-                    "Hive connector only support [text parquet orc] table 
now");
-        }
-        String hiveSdLocation = tableInformation.getSd().getLocation();
-        try {
-            URI uri = new URI(hiveSdLocation);
-            String path = uri.getPath();
-            String defaultFs = hiveSdLocation.replace(path, "");
-            pluginConfig =
-                    pluginConfig
-                            .withValue(
-                                    BaseSourceConfigOptions.FILE_PATH.key(),
-                                    ConfigValueFactory.fromAnyRef(path))
-                            .withValue(
-                                    FS_DEFAULT_NAME_KEY, 
ConfigValueFactory.fromAnyRef(defaultFs));
-            /**
-             * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop 
conf can be
-             * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy 
can obtain the
-             * correct Schema and FsHdfsImpl that can be filled into hadoop 
configuration in {@link
-             * 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
-             */
-            hadoopConf =
-                    StorageFactory.getStorageType(hiveSdLocation)
-                            .buildHadoopConfWithReadOnlyConfig(
-                                    ReadonlyConfig.fromConfig(pluginConfig));
-        } catch (URISyntaxException e) {
-            String errorMsg =
-                    String.format(
-                            "Get hdfs namenode host from table location [%s] 
failed,"
-                                    + "please check it",
-                            hiveSdLocation);
-            throw new HiveConnectorException(
-                    HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED, 
errorMsg, e);
-        }
-        super.prepare(pluginConfig);
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
     }
 
-    private Map<String, Object> parseSchema(Table table) {
-        LinkedHashMap<String, Object> fields = new LinkedHashMap<>();
-        LinkedHashMap<String, Object> schema = new LinkedHashMap<>();
-        List<FieldSchema> cols = table.getSd().getCols();
-        for (FieldSchema col : cols) {
-            String name = col.getName();
-            String type = col.getType();
-            fields.put(name, covertHiveTypeToSeaTunnelType(name, type));
-        }
-        schema.put("fields", fields);
-        return schema;
+    @Override
+    public List<CatalogTable> getProducedCatalogTables() {
+        return multipleTableHiveSourceConfig.getHiveSourceConfigs().stream()
+                .map(HiveSourceConfig::getCatalogTable)
+                .collect(Collectors.toList());
     }
 
-    private Object covertHiveTypeToSeaTunnelType(String name, String hiveType) 
{
-        if (hiveType.contains("varchar")) {
-            return SqlType.STRING;
-        }
-        if (hiveType.contains("char")) {
-            throw CommonError.convertToSeaTunnelTypeError(
-                    getPluginName(), PluginType.SOURCE, hiveType, name);
-        }
-        if (hiveType.contains("binary")) {
-            return SqlType.BYTES.name();
-        }
-        if (hiveType.contains("struct")) {
-            LinkedHashMap<String, Object> fields = new LinkedHashMap<>();
-            int start = hiveType.indexOf("<");
-            int end = hiveType.lastIndexOf(">");
-            String[] columns = hiveType.substring(start + 1, end).split(",");
-            for (String column : columns) {
-                String[] splits = column.split(":");
-                fields.put(splits[0], covertHiveTypeToSeaTunnelType(splits[0], 
splits[1]));
-            }
-            return fields;
-        }
-        return hiveType;
+    @Override
+    public SourceReader<SeaTunnelRow, FileSourceSplit> createReader(
+            SourceReader.Context readerContext) {
+        return new MultipleTableHiveSourceReader(readerContext, 
multipleTableHiveSourceConfig);
+    }
+
+    @Override
+    public SourceSplitEnumerator<FileSourceSplit, FileSourceState> 
createEnumerator(
+            SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext) {
+        return new MultipleTableHiveSourceSplitEnumerator(
+                enumeratorContext, multipleTableHiveSourceConfig);
+    }
+
+    @Override
+    public SourceSplitEnumerator<FileSourceSplit, FileSourceState> 
restoreEnumerator(
+            SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext,
+            FileSourceState checkpointState) {
+        return new MultipleTableHiveSourceSplitEnumerator(
+                enumeratorContext, multipleTableHiveSourceConfig, 
checkpointState);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
index afa1ae0e36..07adfef106 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
@@ -19,27 +19,44 @@ package 
org.apache.seatunnel.connectors.seatunnel.hive.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
 @AutoService(Factory.class)
 public class HiveSourceFactory implements TableSourceFactory {
     @Override
     public String factoryIdentifier() {
-        return "Hive";
+        return HiveConstants.CONNECTOR_NAME;
+    }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () -> (SeaTunnelSource<T, SplitT, StateT>) new 
HiveSource(context.getOptions());
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(HiveConfig.TABLE_NAME)
-                .required(HiveConfig.METASTORE_URI)
+                .optional(HiveConfig.TABLE_NAME)
+                .optional(HiveConfig.METASTORE_URI)
+                .optional(HiveSourceOptions.TABLE_CONFIGS)
                 .optional(BaseSourceConfigOptions.READ_PARTITIONS)
                 .optional(BaseSourceConfigOptions.READ_COLUMNS)
+                .optional(BaseSourceConfigOptions.KERBEROS_PRINCIPAL)
+                .optional(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH)
+                .optional(BaseSourceConfigOptions.REMOTE_USER)
                 .optional(HiveConfig.HADOOP_CONF)
                 .optional(HiveConfig.HADOOP_CONF_PATH)
                 .build();
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
new file mode 100644
index 0000000000..203491dcf9
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfigOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTableUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import lombok.Getter;
+import lombok.SneakyThrows;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER;
+
+@Getter
+public class HiveSourceConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Table table;
+    private final CatalogTable catalogTable;
+    private final FileFormat fileFormat;
+    private final ReadStrategy readStrategy;
+    private final List<String> filePaths;
+    private final HadoopConf hadoopConf;
+
+    @SneakyThrows
+    public HiveSourceConfig(ReadonlyConfig readonlyConfig) {
+        readonlyConfig
+                .getOptional(HdfsSourceConfigOptions.READ_PARTITIONS)
+                .ifPresent(this::validatePartitions);
+        this.table = HiveTableUtils.getTableInfo(readonlyConfig);
+        this.hadoopConf = parseHiveHadoopConfig(readonlyConfig, table);
+        this.fileFormat = HiveTableUtils.parseFileFormat(table);
+        this.readStrategy = parseReadStrategy(table, readonlyConfig, 
fileFormat, hadoopConf);
+        this.filePaths = parseFilePaths(table, readStrategy);
+        this.catalogTable =
+                parseCatalogTable(
+                        readonlyConfig, readStrategy, fileFormat, hadoopConf, 
filePaths, table);
+    }
+
+    private void validatePartitions(List<String> partitionsList) {
+        if (CollectionUtils.isEmpty(partitionsList)) {
+            throw new HiveConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "Partitions list is empty, please check");
+        }
+        int depth = partitionsList.get(0).replaceAll("\\\\", 
"/").split("/").length;
+        long count =
+                partitionsList.stream()
+                        .map(partition -> partition.replaceAll("\\\\", 
"/").split("/").length)
+                        .filter(length -> length != depth)
+                        .count();
+        if (count > 0) {
+            throw new HiveConnectorException(
+                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    "Every partition that in partition list should has the 
same directory depth");
+        }
+    }
+
+    private ReadStrategy parseReadStrategy(
+            Table table,
+            ReadonlyConfig readonlyConfig,
+            FileFormat fileFormat,
+            HadoopConf hadoopConf) {
+
+        ReadStrategy readStrategy = ReadStrategyFactory.of(fileFormat.name());
+        Config config = readonlyConfig.toConfig();
+
+        switch (fileFormat) {
+            case TEXT:
+                // if the file format is text, we set the delim.
+                Map<String, String> parameters = 
table.getSd().getSerdeInfo().getParameters();
+                config =
+                        config.withValue(
+                                        FIELD_DELIMITER.key(),
+                                        ConfigValueFactory.fromAnyRef(
+                                                parameters.get("field.delim")))
+                                .withValue(
+                                        ROW_DELIMITER.key(),
+                                        
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")))
+                                .withValue(
+                                        FILE_FORMAT_TYPE.key(),
+                                        
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.name()));
+                break;
+            case ORC:
+                config =
+                        config.withValue(
+                                FILE_FORMAT_TYPE.key(),
+                                
ConfigValueFactory.fromAnyRef(FileFormat.ORC.name()));
+                break;
+            case PARQUET:
+                config =
+                        config.withValue(
+                                FILE_FORMAT_TYPE.key(),
+                                
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.name()));
+                break;
+            default:
+        }
+        readStrategy.setPluginConfig(config);
+        readStrategy.init(hadoopConf);
+        return readStrategy;
+    }
+
+    private HadoopConf parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, 
Table table) {
+        String hiveSdLocation = table.getSd().getLocation();
+        /**
+         * Build hadoop conf(support s3、cos、oss、hdfs). The returned hadoop 
conf can be
+         * CosConf、OssConf、S3Conf、HadoopConf so that HadoopFileSystemProxy can 
obtain the correct
+         * Schema and FsHdfsImpl that can be filled into hadoop configuration 
in {@link
+         * 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy#createConfiguration()}
+         */
+        HadoopConf hadoopConf =
+                StorageFactory.getStorageType(hiveSdLocation)
+                        .buildHadoopConfWithReadOnlyConfig(readonlyConfig);
+        readonlyConfig
+                .getOptional(HdfsSourceConfigOptions.HDFS_SITE_PATH)
+                .ifPresent(hadoopConf::setHdfsSitePath);
+        readonlyConfig
+                .getOptional(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL)
+                .ifPresent(hadoopConf::setKerberosPrincipal);
+        readonlyConfig
+                .getOptional(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH)
+                .ifPresent(hadoopConf::setKerberosKeytabPath);
+        readonlyConfig
+                .getOptional(HdfsSourceConfigOptions.REMOTE_USER)
+                .ifPresent(hadoopConf::setRemoteUser);
+        return hadoopConf;
+    }
+
+    private List<String> parseFilePaths(Table table, ReadStrategy 
readStrategy) {
+        String hdfsPath = parseHdfsPath(table);
+        try {
+            return readStrategy.getFileNamesByPath(hdfsPath);
+        } catch (Exception e) {
+            String errorMsg = String.format("Get file list from this path [%s] 
failed", hdfsPath);
+            throw new FileConnectorException(
+                    FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
+        }
+    }
+
+    private String parseFsDefaultName(Table table) {
+        String hdfsLocation = table.getSd().getLocation();
+        try {
+            URI uri = new URI(hdfsLocation);
+            String path = uri.getPath();
+            return hdfsLocation.replace(path, "");
+        } catch (URISyntaxException e) {
+            String errorMsg =
+                    String.format(
+                            "Get hdfs namenode host from table location [%s] 
failed,"
+                                    + "please check it",
+                            hdfsLocation);
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED, 
errorMsg, e);
+        }
+    }
+
+    private String parseHdfsPath(Table table) {
+        String hdfsLocation = table.getSd().getLocation();
+        try {
+            URI uri = new URI(hdfsLocation);
+            return uri.getPath();
+        } catch (URISyntaxException e) {
+            String errorMsg =
+                    String.format(
+                            "Get hdfs namenode host from table location [%s] 
failed,"
+                                    + "please check it",
+                            hdfsLocation);
+            throw new HiveConnectorException(
+                    HiveConnectorErrorCode.GET_HDFS_NAMENODE_HOST_FAILED, 
errorMsg, e);
+        }
+    }
+
+    private CatalogTable parseCatalogTable(
+            ReadonlyConfig readonlyConfig,
+            ReadStrategy readStrategy,
+            FileFormat fileFormat,
+            HadoopConf hadoopConf,
+            List<String> filePaths,
+            Table table) {
+        switch (fileFormat) {
+            case PARQUET:
+            case ORC:
+                return parseCatalogTableFromRemotePath(
+                        readonlyConfig, hadoopConf, filePaths, table);
+            case TEXT:
+                return parseCatalogTableFromTable(readonlyConfig, 
readStrategy, table);
+            default:
+                throw new HiveConnectorException(
+                        CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                        "Hive connector only support [text parquet orc] table 
now");
+        }
+    }
+
+    private CatalogTable parseCatalogTableFromRemotePath(
+            ReadonlyConfig readonlyConfig,
+            HadoopConf hadoopConf,
+            List<String> filePaths,
+            Table table) {
+        if (CollectionUtils.isEmpty(filePaths)) {
+            // When the directory is empty, distribute default behavior schema
+            return buildEmptyCatalogTable(readonlyConfig, table);
+        }
+        CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig, 
table);
+        try {
+            SeaTunnelRowType seaTunnelRowTypeInfo =
+                    readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
+            return CatalogTableUtil.newCatalogTable(catalogTable, 
seaTunnelRowTypeInfo);
+        } catch (FileConnectorException e) {
+            String errorMsg =
+                    String.format("Get table schema from file [%s] failed", 
filePaths.get(0));
+            throw new FileConnectorException(
+                    CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED, 
errorMsg, e);
+        }
+    }
+
+    private CatalogTable parseCatalogTableFromTable(
+            ReadonlyConfig readonlyConfig, ReadStrategy readStrategy, Table 
table) {
+        List<FieldSchema> cols = table.getSd().getCols();
+        String[] fieldNames = new String[cols.size()];
+        SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType[cols.size()];
+        for (int i = 0; i < cols.size(); i++) {
+            FieldSchema col = cols.get(i);
+            fieldNames[i] = col.getName();
+            fieldTypes[i] =
+                    
HiveTypeConvertor.covertHiveTypeToSeaTunnelType(col.getName(), col.getType());
+        }
+
+        SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, 
fieldTypes);
+        readStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+        final SeaTunnelRowType finalSeatunnelRowType = 
readStrategy.getActualSeaTunnelRowTypeInfo();
+
+        CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig, 
table);
+        return CatalogTableUtil.newCatalogTable(catalogTable, 
finalSeatunnelRowType);
+    }
+
+    private CatalogTable buildEmptyCatalogTable(ReadonlyConfig readonlyConfig, 
Table table) {
+        TablePath tablePath = TablePath.of(table.getDbName(), 
table.getTableName());
+        return CatalogTable.of(
+                TableIdentifier.of(HiveConstants.CONNECTOR_NAME, tablePath),
+                TableSchema.builder().build(),
+                new HashMap<>(),
+                new ArrayList<>(),
+                
readonlyConfig.get(TableSchemaOptions.TableIdentifierOptions.COMMENT));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
new file mode 100644
index 0000000000..c30cb1783d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceOptions.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
+
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.BaseHiveOptions;
+
+import java.util.List;
+import java.util.Map;
+
+public class HiveSourceOptions extends BaseHiveOptions {
+    public static final Option<List<Map<String, Object>>> TABLE_CONFIGS =
+            Options.key("tables_configs")
+                    .type(new TypeReference<List<Map<String, Object>>>() {})
+                    .noDefaultValue()
+                    .withDescription(
+                            "Local file source configs, used to create 
multiple local file source.");
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
new file mode 100644
index 0000000000..9db899ca8c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import com.google.common.collect.Lists;
+import lombok.Getter;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MultipleTableHiveSourceConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @Getter private List<HiveSourceConfig> hiveSourceConfigs;
+
+    public MultipleTableHiveSourceConfig(ReadonlyConfig readonlyConfig) {
+        if 
(readonlyConfig.getOptional(HiveSourceOptions.TABLE_CONFIGS).isPresent()) {
+            parseFromLocalFileSourceConfigs(readonlyConfig);
+        } else {
+            parseFromLocalFileSourceConfig(readonlyConfig);
+        }
+    }
+
+    private void parseFromLocalFileSourceConfigs(ReadonlyConfig 
readonlyConfig) {
+        this.hiveSourceConfigs =
+                readonlyConfig.get(HiveSourceOptions.TABLE_CONFIGS).stream()
+                        .map(ReadonlyConfig::fromMap)
+                        .map(HiveSourceConfig::new)
+                        .collect(Collectors.toList());
+    }
+
+    private void parseFromLocalFileSourceConfig(ReadonlyConfig 
localFileSourceRootConfig) {
+        HiveSourceConfig hiveSourceConfig = new 
HiveSourceConfig(localFileSourceRootConfig);
+        this.hiveSourceConfigs = Lists.newArrayList(hiveSourceConfig);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
new file mode 100644
index 0000000000..9ea6f1e632
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.reader;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_FAILED;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode.FILE_READ_STRATEGY_NOT_SUPPORT;
+
+@Slf4j
+public class MultipleTableHiveSourceReader implements 
SourceReader<SeaTunnelRow, FileSourceSplit> {
+
+    private final SourceReader.Context context;
+    private volatile boolean noMoreSplit;
+
+    private final Deque<FileSourceSplit> sourceSplits = new 
ConcurrentLinkedDeque<>();
+
+    private final Map<String, ReadStrategy> readStrategyMap;
+
+    public MultipleTableHiveSourceReader(
+            SourceReader.Context context,
+            MultipleTableHiveSourceConfig multipleTableHiveSourceConfig) {
+        this.context = context;
+        this.readStrategyMap =
+                multipleTableHiveSourceConfig.getHiveSourceConfigs().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        localFileSourceConfig ->
+                                                localFileSourceConfig
+                                                        .getCatalogTable()
+                                                        .getTableId()
+                                                        .toTablePath()
+                                                        .toString(),
+                                        HiveSourceConfig::getReadStrategy));
+    }
+
+    @Override
+    public void pollNext(Collector<SeaTunnelRow> output) {
+        synchronized (output.getCheckpointLock()) {
+            FileSourceSplit split = sourceSplits.poll();
+            if (null != split) {
+                ReadStrategy readStrategy = 
readStrategyMap.get(split.getTableId());
+                if (readStrategy == null) {
+                    throw new FileConnectorException(
+                            FILE_READ_STRATEGY_NOT_SUPPORT,
+                            "Cannot found the read strategy for this table: ["
+                                    + split.getTableId()
+                                    + "]");
+                }
+                try {
+                    readStrategy.read(split.getFilePath(), split.getTableId(), 
output);
+                } catch (Exception e) {
+                    String errorMsg =
+                            String.format("Read data from this file [%s] 
failed", split.splitId());
+                    throw new FileConnectorException(FILE_READ_FAILED, 
errorMsg, e);
+                }
+            } else if (noMoreSplit && sourceSplits.isEmpty()) {
+                // signal to the source that we have reached the end of the 
data.
+                log.info(
+                        "There is no more element for the bounded 
MultipleTableLocalFileSourceReader");
+                context.signalNoMoreElement();
+            }
+        }
+    }
+
+    @Override
+    public List<FileSourceSplit> snapshotState(long checkpointId) {
+        return new ArrayList<>(sourceSplits);
+    }
+
+    @Override
+    public void addSplits(List<FileSourceSplit> splits) {
+        sourceSplits.addAll(splits);
+    }
+
+    @Override
+    public void handleNoMoreSplits() {
+        noMoreSplit = true;
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        // do nothing
+    }
+
+    @Override
+    public void open() throws Exception {
+        // do nothing
+        log.info("Opened the MultipleTableHiveSourceReader");
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+        log.info("Closed the MultipleTableHiveSourceReader");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/HiveSourceSplit.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/HiveSourceSplit.java
new file mode 100644
index 0000000000..58ebb808d5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/HiveSourceSplit.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.split;
+
+import org.apache.seatunnel.api.source.SourceSplit;
+
+import lombok.Getter;
+
+public class HiveSourceSplit implements SourceSplit {
+
+    private static final long serialVersionUID = 1L;
+
+    @Getter private final String tableId;
+    @Getter private final String filePath;
+
+    public HiveSourceSplit(String tableId, String filePath) {
+        this.tableId = tableId;
+        this.filePath = filePath;
+    }
+
+    @Override
+    public String splitId() {
+        return tableId + "_" + filePath;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
new file mode 100644
index 0000000000..aa4701e37b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/split/MultipleTableHiveSourceSplitEnumerator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.split;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.MultipleTableHiveSourceConfig;
+
+import org.apache.commons.collections4.CollectionUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class MultipleTableHiveSourceSplitEnumerator
+        implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
+
+    private final SourceSplitEnumerator.Context<FileSourceSplit> context;
+    private final Set<FileSourceSplit> pendingSplit;
+    private final Set<FileSourceSplit> assignedSplit;
+    private final Map<String, List<String>> filePathMap;
+
+    public MultipleTableHiveSourceSplitEnumerator(
+            SourceSplitEnumerator.Context<FileSourceSplit> context,
+            MultipleTableHiveSourceConfig multipleTableLocalFileSourceConfig) {
+        this.context = context;
+        this.filePathMap =
+                
multipleTableLocalFileSourceConfig.getHiveSourceConfigs().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        localFileSourceConfig ->
+                                                localFileSourceConfig
+                                                        .getCatalogTable()
+                                                        .getTableId()
+                                                        .toTablePath()
+                                                        .toString(),
+                                        HiveSourceConfig::getFilePaths));
+        this.assignedSplit = new HashSet<>();
+        this.pendingSplit = new HashSet<>();
+    }
+
+    public MultipleTableHiveSourceSplitEnumerator(
+            SourceSplitEnumerator.Context<FileSourceSplit> context,
+            MultipleTableHiveSourceConfig multipleTableLocalFileSourceConfig,
+            FileSourceState localFileSourceState) {
+        this(context, multipleTableLocalFileSourceConfig);
+        this.assignedSplit.addAll(localFileSourceState.getAssignedSplit());
+    }
+
+    @Override
+    public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
+        if (CollectionUtils.isEmpty(splits)) {
+            return;
+        }
+        pendingSplit.addAll(splits);
+        assignSplit(subtaskId);
+    }
+
+    @Override
+    public int currentUnassignedSplitSize() {
+        return pendingSplit.size();
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId) {}
+
+    @Override
+    public void registerReader(int subtaskId) {
+        for (Map.Entry<String, List<String>> filePathEntry : 
filePathMap.entrySet()) {
+            String tableId = filePathEntry.getKey();
+            List<String> filePaths = filePathEntry.getValue();
+            for (String filePath : filePaths) {
+                pendingSplit.add(new FileSourceSplit(tableId, filePath));
+            }
+        }
+        assignSplit(subtaskId);
+    }
+
+    @Override
+    public FileSourceState snapshotState(long checkpointId) {
+        return new FileSourceState(assignedSplit);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+        // do nothing.
+    }
+
+    private void assignSplit(int taskId) {
+        List<FileSourceSplit> currentTaskSplits = new ArrayList<>();
+        if (context.currentParallelism() == 1) {
+            // if parallelism == 1, we should assign all the splits to reader
+            currentTaskSplits.addAll(pendingSplit);
+        } else {
+            // if parallelism > 1, according to hashCode of split's id to 
determine whether to
+            // allocate the current task
+            for (FileSourceSplit fileSourceSplit : pendingSplit) {
+                int splitOwner =
+                        getSplitOwner(fileSourceSplit.splitId(), 
context.currentParallelism());
+                if (splitOwner == taskId) {
+                    currentTaskSplits.add(fileSourceSplit);
+                }
+            }
+        }
+        // assign splits
+        context.assignSplit(taskId, currentTaskSplits);
+        // save the state of assigned splits
+        assignedSplit.addAll(currentTaskSplits);
+        // remove the assigned splits from pending splits
+        currentTaskSplits.forEach(pendingSplit::remove);
+        log.info(
+                "SubTask {} is assigned to [{}]",
+                taskId,
+                currentTaskSplits.stream()
+                        .map(FileSourceSplit::splitId)
+                        .collect(Collectors.joining(",")));
+        context.signalNoMoreSplits(taskId);
+    }
+
+    private static int getSplitOwner(String tp, int numReaders) {
+        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    }
+
+    @Override
+    public void open() {
+        // do nothing
+    }
+
+    @Override
+    public void run() throws Exception {
+        // do nothing
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/state/HiveSourceState.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/state/HiveSourceState.java
new file mode 100644
index 0000000000..cd9da5351e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/state/HiveSourceState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.state;
+
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.split.HiveSourceSplit;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class HiveSourceState implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final Set<HiveSourceSplit> assignedSplit;
+
+    public HiveSourceState(Set<HiveSourceSplit> assignedSplit) {
+        this.assignedSplit = assignedSplit;
+    }
+
+    public Set<HiveSourceSplit> getAssignedSplit() {
+        return assignedSplit;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index 6a1288b661..b3c463d804 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -17,15 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.utils;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -47,36 +46,31 @@ public class HiveMetaStoreProxy {
     private HiveMetaStoreClient hiveMetaStoreClient;
     private static volatile HiveMetaStoreProxy INSTANCE = null;
 
-    private HiveMetaStoreProxy(Config config) {
-        String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key());
-
+    private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
+        String metastoreUri = 
readonlyConfig.get(HiveSourceOptions.METASTORE_URI);
+        HiveConf hiveConf = new HiveConf();
+        hiveConf.set("hive.metastore.uris", metastoreUri);
         try {
-            HiveConf hiveConf = new HiveConf();
-            hiveConf.set("hive.metastore.uris", metastoreUri);
-            if (config.hasPath(HiveConfig.HIVE_SITE_PATH.key())) {
-                String hiveSitePath = 
config.getString(HiveConfig.HIVE_SITE_PATH.key());
+            if 
(StringUtils.isNotEmpty(readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH))) {
+                String hiveSitePath = 
readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH);
                 hiveConf.addResource(new File(hiveSitePath).toURI().toURL());
             }
-            if (HiveMetaStoreProxyUtils.enableKerberos(config)) {
+            if (HiveMetaStoreProxyUtils.enableKerberos(readonlyConfig)) {
                 this.hiveMetaStoreClient =
                         HadoopLoginFactory.loginWithKerberos(
                                 new Configuration(),
-                                TypesafeConfigUtils.getConfig(
-                                        config,
-                                        
BaseSourceConfigOptions.KRB5_PATH.key(),
-                                        
BaseSourceConfigOptions.KRB5_PATH.defaultValue()),
-                                
config.getString(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key()),
-                                config.getString(
-                                        
BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key()),
+                                
readonlyConfig.get(BaseSourceConfigOptions.KRB5_PATH),
+                                
readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_PRINCIPAL),
+                                
readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH),
                                 (configuration, userGroupInformation) ->
                                         new HiveMetaStoreClient(hiveConf));
                 return;
             }
-            if (HiveMetaStoreProxyUtils.enableRemoteUser(config)) {
+            if (HiveMetaStoreProxyUtils.enableRemoteUser(readonlyConfig)) {
                 this.hiveMetaStoreClient =
                         HadoopLoginFactory.loginWithRemoteUser(
                                 new Configuration(),
-                                
config.getString(BaseSourceConfigOptions.REMOTE_USER.key()),
+                                
readonlyConfig.get(BaseSourceConfigOptions.REMOTE_USER),
                                 (configuration, userGroupInformation) ->
                                         new HiveMetaStoreClient(hiveConf));
                 return;
@@ -95,7 +89,7 @@ public class HiveMetaStoreProxy {
                     String.format(
                             "Using this hive uris [%s], hive conf [%s] to 
initialize "
                                     + "hive metastore client instance failed",
-                            metastoreUri, 
config.getString(HiveConfig.HIVE_SITE_PATH.key()));
+                            metastoreUri, 
readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH));
             throw new HiveConnectorException(
                     
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e);
         } catch (Exception e) {
@@ -106,11 +100,11 @@ public class HiveMetaStoreProxy {
         }
     }
 
-    public static HiveMetaStoreProxy getInstance(Config config) {
+    public static HiveMetaStoreProxy getInstance(ReadonlyConfig 
readonlyConfig) {
         if (INSTANCE == null) {
             synchronized (HiveMetaStoreProxy.class) {
                 if (INSTANCE == null) {
-                    INSTANCE = new HiveMetaStoreProxy(config);
+                    INSTANCE = new HiveMetaStoreProxy(readonlyConfig);
                 }
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java
index fda221d886..f1474f7694 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtils.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.utils;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
 
 import lombok.experimental.UtilityClass;
@@ -26,11 +25,11 @@ import lombok.experimental.UtilityClass;
 @UtilityClass
 public class HiveMetaStoreProxyUtils {
 
-    public boolean enableKerberos(Config config) {
+    public boolean enableKerberos(ReadonlyConfig config) {
         boolean kerberosPrincipalEmpty =
-                
config.hasPath(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key());
+                
config.getOptional(BaseSourceConfigOptions.KERBEROS_PRINCIPAL).isPresent();
         boolean kerberosKeytabPathEmpty =
-                
config.hasPath(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key());
+                
config.getOptional(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH).isPresent();
         if (kerberosKeytabPathEmpty && kerberosPrincipalEmpty) {
             return true;
         }
@@ -43,7 +42,7 @@ public class HiveMetaStoreProxyUtils {
         throw new IllegalArgumentException("Please set kerberosKeytabPath");
     }
 
-    public boolean enableRemoteUser(Config config) {
-        return config.hasPath(BaseSourceConfigOptions.REMOTE_USER.key());
+    public boolean enableRemoteUser(ReadonlyConfig config) {
+        return 
config.getOptional(BaseSourceConfigOptions.REMOTE_USER).isPresent();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
new file mode 100644
index 0000000000..e4282db204
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+
+public class HiveTableUtils {
+
+    public static Table getTableInfo(ReadonlyConfig readonlyConfig) {
+        String table = readonlyConfig.get(HiveSourceOptions.TABLE_NAME);
+        TablePath tablePath = TablePath.of(table);
+        if (tablePath.getDatabaseName() == null || tablePath.getTableName() == 
null) {
+            throw new SeaTunnelRuntimeException(
+                    HiveConnectorErrorCode.HIVE_TABLE_NAME_ERROR, "Current 
table name is " + table);
+        }
+        HiveMetaStoreProxy hiveMetaStoreProxy = 
HiveMetaStoreProxy.getInstance(readonlyConfig);
+        Table tableInformation =
+                hiveMetaStoreProxy.getTable(tablePath.getDatabaseName(), 
tablePath.getTableName());
+        hiveMetaStoreProxy.close();
+        return tableInformation;
+    }
+
+    public static FileFormat parseFileFormat(Table table) {
+        String inputFormat = table.getSd().getInputFormat();
+        if (HiveConstants.TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+            return FileFormat.TEXT;
+        }
+        if (HiveConstants.PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+            return FileFormat.PARQUET;
+        }
+        if (HiveConstants.ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+            return FileFormat.ORC;
+        }
+        throw new HiveConnectorException(
+                CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
+                "Hive connector only support [text parquet orc] table now");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java
new file mode 100644
index 0000000000..7aac96c978
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+
+import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+
+import java.util.LinkedHashMap;
+
+public class HiveTypeConvertor {
+
+    public static SeaTunnelDataType<?> covertHiveTypeToSeaTunnelType(String 
name, String hiveType) {
+        if (hiveType.contains("varchar")) {
+            return BasicType.STRING_TYPE;
+        }
+        if (hiveType.contains("char")) {
+            throw CommonError.convertToSeaTunnelTypeError(
+                    HiveConstants.CONNECTOR_NAME, PluginType.SOURCE, hiveType, 
name);
+        }
+        if (hiveType.contains("binary")) {
+            return PrimitiveByteArrayType.INSTANCE;
+        }
+        if (hiveType.contains("struct")) {
+            LinkedHashMap<String, Object> fields = new LinkedHashMap<>();
+            int start = hiveType.indexOf("<");
+            int end = hiveType.lastIndexOf(">");
+            String[] columns = hiveType.substring(start + 1, end).split(",");
+            for (String column : columns) {
+                String[] splits = column.split(":");
+                fields.put(
+                        splits[0], covertHiveTypeToSeaTunnelType(splits[0], 
splits[1]).toString());
+            }
+            return SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(
+                    name, JsonUtils.toJsonString(fields));
+        }
+        return 
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(name, hiveType);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/TypeConvertTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/TypeConvertTest.java
deleted file mode 100644
index 3810085b36..0000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/TypeConvertTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import org.apache.seatunnel.common.utils.ReflectionUtils;
-
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.Optional;
-
-public class TypeConvertTest {
-
-    @Test
-    void testWithUnsupportedType() {
-        Optional<Method> parseSchema =
-                ReflectionUtils.getDeclaredMethod(HiveSource.class, 
"parseSchema", Table.class);
-        Assertions.assertTrue(parseSchema.isPresent());
-        Table table = new Table();
-        table.setSd(new StorageDescriptor());
-        table.getSd().addToCols(new FieldSchema("test", "char", null));
-        InvocationTargetException exception =
-                Assertions.assertThrows(
-                        InvocationTargetException.class,
-                        () -> parseSchema.get().invoke(new HiveSource(), 
table));
-        Assertions.assertEquals(
-                "ErrorCode:[COMMON-16], ErrorDescription:['Hive' source 
unsupported convert type 'char' of 'test' to SeaTunnel data type.]",
-                exception.getCause().getMessage());
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java
index ad952112c4..eb0afe9d4a 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyUtilsTest.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.hive.utils;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
 import org.junit.jupiter.api.Test;
 
 import lombok.SneakyThrows;
@@ -35,7 +37,7 @@ class HiveMetaStoreProxyUtilsTest {
 
     @Test
     void enableKerberos() {
-        Config config = parseConfig("/hive_without_kerberos.conf");
+        ReadonlyConfig config = parseConfig("/hive_without_kerberos.conf");
         assertFalse(HiveMetaStoreProxyUtils.enableKerberos(config));
         assertFalse(HiveMetaStoreProxyUtils.enableRemoteUser(config));
 
@@ -48,9 +50,10 @@ class HiveMetaStoreProxyUtilsTest {
     }
 
     @SneakyThrows
-    private Config parseConfig(String configFile) {
+    private ReadonlyConfig parseConfig(String configFile) {
         URL resource = 
HiveMetaStoreProxyUtilsTest.class.getResource(configFile);
         String filePath = Paths.get(resource.toURI()).toString();
-        return ConfigFactory.parseFile(new File(filePath));
+        Config config = ConfigFactory.parseFile(new File(filePath));
+        return ReadonlyConfig.fromConfig(config);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertorTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertorTest.java
new file mode 100644
index 0000000000..ef87e083a4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTypeConvertorTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class HiveTypeConvertorTest {
+
+    @Test
+    void covertHiveTypeToSeaTunnelType() {
+        SeaTunnelRuntimeException exception =
+                Assertions.assertThrows(
+                        SeaTunnelRuntimeException.class,
+                        () -> 
HiveTypeConvertor.covertHiveTypeToSeaTunnelType("test", "char"));
+        assertEquals(
+                "ErrorCode:[COMMON-16], ErrorDescription:['Hive' source 
unsupported convert type 'char' of 'test' to SeaTunnel data type.]",
+                exception.getMessage());
+    }
+
+    @Test
+    void convertHiveStructType() {
+        SeaTunnelDataType<?> structType =
+                HiveTypeConvertor.covertHiveTypeToSeaTunnelType(
+                        "structType", "struct<country:String,city:String>");
+        assertEquals(SqlType.ROW, structType.getSqlType());
+        SeaTunnelRowType seaTunnelRowType = (SeaTunnelRowType) structType;
+        assertEquals(BasicType.STRING_TYPE, seaTunnelRowType.getFieldType(0));
+        assertEquals(BasicType.STRING_TYPE, seaTunnelRowType.getFieldType(0));
+    }
+}
diff --git 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
index 5b46d81201..6d59ff27f5 100644
--- 
a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
+++ 
b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java
@@ -64,6 +64,7 @@ public class ConnectorSpecificationCheckTest {
         // hive-exec.jar. We need to check manually.
         List<String> blockList = new ArrayList<>();
         blockList.add("HiveSourceFactory");
+        blockList.add("HiveSinkFactory");
 
         for (TableSourceFactory factory : sourceFactories) {
             if (ReflectionUtils.getDeclaredMethod(
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml 
b/seatunnel-examples/seatunnel-engine-examples/pom.xml
index 3256bdde88..1f22a56658 100644
--- a/seatunnel-examples/seatunnel-engine-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml
@@ -67,5 +67,55 @@
             <artifactId>connector-assert</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-hive</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>2.3.9</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-web</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>parquet-hadoop-bundle</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.pentaho</groupId>
+                    <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 </project>

Reply via email to