This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a357dca36 [Improve][Connector-V2] Refactor hive source & sink 
connector (#2708)
a357dca36 is described below

commit a357dca36503c9ef49bdfb15c7a3bfeaa50ca0b8
Author: TyrantLucifer <[email protected]>
AuthorDate: Thu Sep 15 13:49:02 2022 +0800

    [Improve][Connector-V2] Refactor hive source & sink connector (#2708)
---
 docs/en/connector-v2/sink/Hive.md                  |  34 ++--
 docs/en/connector-v2/source/Hive.md                |  47 ++++++
 seatunnel-connectors-v2/connector-hive/pom.xml     | 131 +++------------
 .../connectors/seatunnel/hive/config/Config.java   |  21 ---
 .../connectors/seatunnel/hive/config/Constant.java |  23 ---
 .../seatunnel/hive/config/HiveConfig.java          |  47 ++++++
 .../seatunnel/hive/config/SourceConfig.java        |  29 ----
 .../hive/exception/HivePluginException.java        |  29 ----
 .../seatunnel/hive/sink/BaseHiveCommitInfo.java    |  36 ----
 .../hive/sink/HiveAggregatedCommitInfo.java        |  36 ----
 .../seatunnel/hive/sink/HiveCommitInfo.java        |  36 ----
 .../connectors/seatunnel/hive/sink/HiveSink.java   | 166 +++++++++---------
 .../hive/sink/HiveSinkAggregatedCommitter.java     | 139 ----------------
 .../seatunnel/hive/sink/HiveSinkConfig.java        | 155 -----------------
 .../seatunnel/hive/sink/HiveSinkState.java         |  30 ----
 .../seatunnel/hive/sink/HiveSinkWriter.java        | 160 ------------------
 .../seatunnel/hive/source/HadoopConf.java          |  34 ----
 .../seatunnel/hive/source/HiveSource.java          | 100 +++++------
 .../seatunnel/hive/source/HiveSourceReader.java    |  98 -----------
 .../seatunnel/hive/source/HiveSourceSplit.java     |  36 ----
 .../hive/source/HiveSourceSplitEnumerator.java     | 126 --------------
 .../seatunnel/hive/source/HiveSourceState.java     |  35 ----
 .../file/reader/format/AbstractReadStrategy.java   |  84 ----------
 .../source/file/reader/format/OrcReadStrategy.java | 168 -------------------
 .../file/reader/format/ParquetReadStrategy.java    | 185 ---------------------
 .../source/file/reader/format/ReadStrategy.java    |  42 -----
 .../file/reader/format/ReadStrategyFactory.java    |  38 -----
 .../file/reader/format/TextReadStrategy.java       |  56 -------
 .../hive/source/file/reader/type/FileTypeEnum.java |  48 ------
 .../seatunnel/hive/utils/HiveMetaStoreProxy.java   |  45 ++++-
 .../seatunnel/hive/sink/HiveSinkConfigTest.java    |  51 ------
 31 files changed, 293 insertions(+), 1972 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hive.md 
b/docs/en/connector-v2/sink/Hive.md
index 56b49ad7b..59d8dc5c3 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -22,22 +22,22 @@ By default, we use 2PC commit to ensure `exactly-once`
 
 ## Options
 
-| name                              | type   | required | default value        
                                         |
-| --------------------------------- | ------ | -------- | 
------------------------------------------------------------- |
-| hive_table_name                   | string | yes      | -                    
                                         |
-| hive_metastore_uris               | string | yes      | -                    
                                         |
-| partition_by                      | array  | no       | -                    
                                         |
-| sink_columns                      | array  | no       | When this parameter 
is empty, all fields are sink columns     |
-| is_enable_transaction             | boolean| no       | true                 
                                         |
-| save_mode                         | string | no       | "append"             
                                         |
+| name                  | type   | required | default value                    
                             |
+|-----------------------| ------ | -------- | 
------------------------------------------------------------- |
+| table_name            | string | yes      | -                                
                             |
+| metastore_uri         | string | yes      | -                                
                             |
+| partition_by          | array  | no       | -                                
                             |
+| sink_columns          | array  | no       | When this parameter is empty, 
all fields are sink columns     |
+| is_enable_transaction | boolean| no       | true                             
                             |
+| save_mode             | string | no       | "append"                         
                             |
 
-### hive_table_name [string]
+### table_name [string]
 
 Target Hive table name eg: db1.table1
 
-### hive_metastore_uris [string]
+### metastore_uri [string]
 
-Hive metastore uris
+Hive metastore uri
 
 ### partition_by [array]
 
@@ -64,13 +64,9 @@ Streaming Job not support `overwrite`.
 
 ```bash
 
-Hive {
-    hive_table_name="db1.table1"
-    hive_metastore_uris="thrift://localhost:9083"
-    partition_by=["age"]
-    sink_columns=["name","age"]
-    is_enable_transaction=true
-    save_mode="append"
-}
+  Hive {
+    table_name = "default.seatunnel_orc"
+    metastore_uri = "thrift://namenode001:9083"
+  }
 
 ```
diff --git a/docs/en/connector-v2/source/Hive.md 
b/docs/en/connector-v2/source/Hive.md
new file mode 100644
index 000000000..dcc4030f5
--- /dev/null
+++ b/docs/en/connector-v2/source/Hive.md
@@ -0,0 +1,47 @@
+# Hive
+
+> Hive source connector
+
+## Description
+
+Read data from Hive.
+
+In order to use this connector, You must ensure your spark/flink cluster 
already integrated hive. The tested hive version is 2.3.9.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we use 2PC commit to ensure `exactly-once`
+
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] file format
+  - [x] text
+  - [x] parquet
+  - [x] orc
+
+## Options
+
+| name                  | type   | required | default value                    
                             |
+|-----------------------| ------ | -------- | 
------------------------------------------------------------- |
+| table_name            | string | yes      | -                                
                             |
+| metastore_uri         | string | yes      | -                                
                             |
+
+### table_name [string]
+
+Target Hive table name eg: db1.table1
+
+### metastore_uri [string]
+
+Hive metastore uri
+
+## Example
+
+```bash
+
+  Hive {
+    table_name = "default.seatunnel_orc"
+    metastore_uri = "thrift://namenode001:9083"
+  }
+
+```
diff --git a/seatunnel-connectors-v2/connector-hive/pom.xml 
b/seatunnel-connectors-v2/connector-hive/pom.xml
index a00016488..39c6f6429 100644
--- a/seatunnel-connectors-v2/connector-hive/pom.xml
+++ b/seatunnel-connectors-v2/connector-hive/pom.xml
@@ -30,146 +30,61 @@
     <artifactId>connector-hive</artifactId>
     
     <properties>
-        <hive.exec.version>2.3.9</hive.exec.version>
-        <flink.hadoop.version>2.7.5-7.0</flink.hadoop.version>
-        <orc.version>1.5.6</orc.version>
-        <parquet-avro.version>1.10.0</parquet-avro.version>
-        <commons.collecton4.version>4.4</commons.collecton4.version>
-        <commons.lang3.version>3.4</commons.lang3.version>
-        <janino.version>3.1.6</janino.version>
+        <hive.metastore.version>2.3.9</hive.metastore.version>
     </properties>
 
     <dependencies>
         <dependency>
-            <groupId>org.apache.hive</groupId>
-            <artifactId>hive-exec</artifactId>
-            <version>${hive.exec.version}</version>
-            <scope>provided</scope>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-hadoop</artifactId>
+            <version>${project.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>org.pentaho</groupId>
-                    <artifactId>pentaho-aggdesigner-algorithm</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-1.2-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-web</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apapche.hadoop</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.github.joshelser</groupId>
-                    
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-hdfs</artifactId>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-shaded-hadoop-2</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
-
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-hadoop-2</artifactId>
-            <version>${flink.hadoop.version}</version>
             <scope>provided</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-core-base</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
         <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-            <version>${commons.lang3.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.orc</groupId>
-            <artifactId>orc-core</artifactId>
-            <version>${orc.version}</version>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>
+            <version>${hive.metastore.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
+                    <artifactId>log4j</artifactId>
+                    <groupId>log4j</groupId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-common</artifactId>
+                    <artifactId>log4j-1.2-api</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-hdfs</artifactId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
                 </exclusion>
                 <exclusion>
+                    <artifactId>log4j-web</artifactId>
                     <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>*</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apapche.hadoop</groupId>
-                    <artifactId>*</artifactId>
+                    <artifactId>parquet-hadoop-bundle</artifactId>
+                    <groupId>org.apache.parquet</groupId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.apache.curator</groupId>
-                    <artifactId>*</artifactId>
+                    <artifactId>jdk.tools</artifactId>
+                    <groupId>jdk.tools</groupId>
                 </exclusion>
             </exclusions>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.parquet</groupId>
-            <artifactId>parquet-avro</artifactId>
-            <version>${parquet-avro.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-collections4</artifactId>
-            <version>${commons.collecton4.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-file-hadoop</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.codehaus.janino</groupId>
-            <artifactId>janino</artifactId>
-            <version>${janino.version}</version>
-        </dependency>
     </dependencies>
+
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
deleted file mode 100644
index 2177ffbde..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Config.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.config;
-
-public class Config {
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
deleted file mode 100644
index dd0b2ab54..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/Constant.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.config;
-
-public class Constant {
-    public static final String HIVE_RESULT_TABLE_NAME = "hive_table_name";
-    public static final String HIVE_METASTORE_URIS = "hive_metastore_uris";
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
new file mode 100644
index 000000000..16ab34b5e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveConfig.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.config;
+
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+public class HiveConfig {
+    public static final String TABLE_NAME = "table_name";
+    public static final String METASTORE_URI = "metastore_uri";
+    public static final String TEXT_INPUT_FORMAT_CLASSNAME = 
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextInputFormat";
+    public static final String TEXT_OUTPUT_FORMAT_CLASSNAME = 
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
+    public static final String PARQUET_INPUT_FORMAT_CLASSNAME = 
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
+    public static final String PARQUET_OUTPUT_FORMAT_CLASSNAME = 
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
+    public static final String ORC_INPUT_FORMAT_CLASSNAME = 
"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
+    public static final String ORC_OUTPUT_FORMAT_CLASSNAME = 
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
+
+    public static Pair<String[], Table> getTableInfo(Config config) {
+        String table = config.getString(TABLE_NAME);
+        String[] splits = table.split("\\.");
+        if (splits.length != 2) {
+            throw new RuntimeException("Please config " + TABLE_NAME + " as 
db.table format");
+        }
+        HiveMetaStoreProxy hiveMetaStoreProxy = 
HiveMetaStoreProxy.getInstance(config);
+        Table tableInformation = hiveMetaStoreProxy.getTable(splits[0], 
splits[1]);
+        return Pair.of(splits, tableInformation);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/SourceConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/SourceConfig.java
deleted file mode 100644
index 705440316..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/SourceConfig.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.config;
-
-import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
-
-public class SourceConfig {
-
-    public static final String FILE_TYPE = "file.type";
-
-    public static final String DEFAULT_FS = FS_DEFAULT_NAME_KEY;
-
-    public static final String FILE_PATH = "file.path";
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HivePluginException.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HivePluginException.java
deleted file mode 100644
index 8e1c22548..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/exception/HivePluginException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.exception;
-
-public class HivePluginException extends Exception{
-
-    public HivePluginException(String message) {
-        super(message);
-    }
-
-    public HivePluginException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/BaseHiveCommitInfo.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/BaseHiveCommitInfo.java
deleted file mode 100644
index 7422ecf4c..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/BaseHiveCommitInfo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-import java.io.Serializable;
-
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-public class BaseHiveCommitInfo implements Serializable {
-
-    private String hiveMetastoreUris;
-
-    private Table table;
-
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
deleted file mode 100644
index 1311c925d..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveAggregatedCommitInfo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-@Data
-@AllArgsConstructor
-public class HiveAggregatedCommitInfo extends BaseHiveCommitInfo {
-
-    private FileAggregatedCommitInfo fileAggregatedCommitInfo;
-
-    public HiveAggregatedCommitInfo(FileAggregatedCommitInfo 
fileAggregatedCommitInfo, String hiveMetastoreUris, Table table) {
-        super(hiveMetastoreUris, table);
-        this.fileAggregatedCommitInfo = fileAggregatedCommitInfo;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
deleted file mode 100644
index 509352f46..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveCommitInfo.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-@Data
-@AllArgsConstructor
-public class HiveCommitInfo extends BaseHiveCommitInfo {
-
-    private FileCommitInfo fileCommitInfo;
-
-    public HiveCommitInfo(FileCommitInfo fileCommitInfo, String 
hiveMetastoreUris, Table table) {
-        super(hiveMetastoreUris, table);
-        this.fileCommitInfo = fileCommitInfo;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 36bd50ca3..9adae1158 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -17,40 +17,50 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
-import org.apache.seatunnel.api.common.JobContext;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FIELD_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_NAME_EXPRESSION;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.IS_PARTITION_FIELD_WRITE_IN_FILE;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.ROW_DELIMITER;
+import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SAVE_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_OUTPUT_FORMAT_CLASSNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_OUTPUT_FORMAT_CLASSNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_OUTPUT_FORMAT_CLASSNAME;
+
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.serialization.DefaultSerializer;
-import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
 import com.google.auto.service.AutoService;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
 
-import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
-import java.util.Optional;
+import java.util.Map;
+import java.util.stream.Collectors;
 
-/**
- * Hive Sink implementation by using SeaTunnel sink API.
- * This class contains the method to create {@link HiveSinkWriter} and {@link 
HiveSinkAggregatedCommitter}.
- */
 @AutoService(SeaTunnelSink.class)
-public class HiveSink implements SeaTunnelSink<SeaTunnelRow, HiveSinkState, 
HiveCommitInfo, HiveAggregatedCommitInfo> {
-
-    private Config config;
-    private String jobId;
-    private Long checkpointId;
-    private SeaTunnelRowType seaTunnelRowTypeInfo;
-    private JobContext jobContext;
-    private HiveSinkConfig hiveSinkConfig;
+public class HiveSink extends BaseFileSink {
+    private String dbName;
+    private String tableName;
+    private Table tableInformation;
 
     @Override
     public String getPluginName() {
@@ -58,73 +68,67 @@ public class HiveSink implements 
SeaTunnelSink<SeaTunnelRow, HiveSinkState, Hive
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowTypeInfo) {
-        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
-        this.hiveSinkConfig = new HiveSinkConfig(config, seaTunnelRowTypeInfo);
-    }
+    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        super.setTypeInfo(seaTunnelRowType);
+        HiveMetaStoreProxy hiveMetaStoreProxy = 
HiveMetaStoreProxy.getInstance(pluginConfig);
+        // --------------------Check textFileSinkConfig with the hive table 
info-------------------
+        List<FieldSchema> fields = hiveMetaStoreProxy.getTableFields(dbName, 
tableName);
+        List<FieldSchema> partitionKeys = tableInformation.getPartitionKeys();
+
+        // Remove partitionKeys from table fields
+        List<FieldSchema> fieldNotContainPartitionKey = 
fields.stream().filter(filed -> 
!partitionKeys.contains(filed)).collect(Collectors.toList());
+
+        // check fields size must same as sinkColumnList size
+        if (fieldNotContainPartitionKey.size() != 
textFileSinkConfig.getSinkColumnList().size()) {
+            throw new RuntimeException("sink columns size must same as hive 
table field size");
+        }
 
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
-        return this.seaTunnelRowTypeInfo;
+        // check hivePartitionFieldList size must same as partitionFieldList 
size
+        if (partitionKeys.size() != 
textFileSinkConfig.getPartitionFieldList().size()) {
+            throw new RuntimeException("partition by columns size must same as 
hive table partition columns size");
+        }
+        hiveMetaStoreProxy.close();
     }
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.config = pluginConfig;
-        this.checkpointId = 1L;
-    }
-
-    @Override
-    public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState> 
createWriter(SinkWriter.Context context) throws IOException {
-        if (!jobContext.getJobMode().equals(JobMode.BATCH) && 
hiveSinkConfig.getTextFileSinkConfig().getSaveMode().equals(SaveMode.OVERWRITE))
 {
-            throw new RuntimeException("only batch job can overwrite hive 
table");
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HiveConfig.METASTORE_URI, HiveConfig.TABLE_NAME);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
         }
-
-        if 
(!this.getSinkConfig().getTextFileSinkConfig().isEnableTransaction()) {
-            throw new RuntimeException("Hive Sink Connector only support 
transaction now");
+        Pair<String[], Table> tableInfo = 
HiveConfig.getTableInfo(pluginConfig);
+        dbName = tableInfo.getLeft()[0];
+        tableName = tableInfo.getLeft()[1];
+        tableInformation = tableInfo.getRight();
+        String outputFormat = tableInformation.getSd().getOutputFormat();
+        if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
+            Map<String, String> parameters = 
tableInformation.getSd().getSerdeInfo().getParameters();
+            pluginConfig = pluginConfig.withValue(FILE_FORMAT, 
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
+                    .withValue(FIELD_DELIMITER, 
ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
+                    .withValue(ROW_DELIMITER, 
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
+        } else if (PARQUET_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
+            pluginConfig = pluginConfig.withValue(FILE_FORMAT, 
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
+        } else if (ORC_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
+            pluginConfig = pluginConfig.withValue(FILE_FORMAT, 
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
+        } else {
+            throw new RuntimeException("Only support [text parquet orc] file 
now");
         }
-        return new HiveSinkWriter(seaTunnelRowTypeInfo,
-            config,
-            context,
-            getSinkConfig(),
-            jobId);
-    }
+        pluginConfig = 
pluginConfig.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE, 
ConfigValueFactory.fromAnyRef(false))
+                .withValue(FILE_NAME_EXPRESSION, 
ConfigValueFactory.fromAnyRef("${transactionId}"))
+                .withValue(PATH, 
ConfigValueFactory.fromAnyRef(tableInformation.getSd().getLocation()));
 
-    @Override
-    public SinkWriter<SeaTunnelRow, HiveCommitInfo, HiveSinkState> 
restoreWriter(SinkWriter.Context context, List<HiveSinkState> states) throws 
IOException {
-        return new HiveSinkWriter(seaTunnelRowTypeInfo, config, context, 
hiveSinkConfig, jobId, states);
-    }
-
-    @Override
-    public void setJobContext(JobContext jobContext) {
-        this.jobContext = jobContext;
-        this.jobId = jobContext.getJobId();
-    }
-
-    @Override
-    public Optional<SinkAggregatedCommitter<HiveCommitInfo, 
HiveAggregatedCommitInfo>> createAggregatedCommitter() throws IOException {
-        return Optional.of(new HiveSinkAggregatedCommitter());
-    }
-
-    @Override
-    public Optional<Serializer<HiveSinkState>> getWriterStateSerializer() {
-        return Optional.of(new DefaultSerializer<>());
-    }
-
-    @Override
-    public Optional<Serializer<HiveAggregatedCommitInfo>> 
getAggregatedCommitInfoSerializer() {
-        return Optional.of(new DefaultSerializer<>());
-    }
-
-    @Override
-    public Optional<Serializer<HiveCommitInfo>> getCommitInfoSerializer() {
-        return Optional.of(new DefaultSerializer<>());
-    }
-
-    private HiveSinkConfig getSinkConfig() {
-        if (this.hiveSinkConfig == null && (this.seaTunnelRowTypeInfo != null 
&& this.config != null)) {
-            this.hiveSinkConfig = new HiveSinkConfig(config, 
seaTunnelRowTypeInfo);
+        if (!pluginConfig.hasPath(SAVE_MODE) || 
StringUtils.isBlank(pluginConfig.getString(SAVE_MODE))) {
+            pluginConfig = pluginConfig.withValue(SAVE_MODE, 
ConfigValueFactory.fromAnyRef(SaveMode.APPEND.toString()));
+        }
+        String hdfsLocation = tableInformation.getSd().getLocation();
+        try {
+            URI uri = new URI(hdfsLocation);
+            String path = uri.getPath();
+            pluginConfig = pluginConfig.withValue(PATH, 
ConfigValueFactory.fromAnyRef(path));
+            hadoopConf = new HadoopConf(hdfsLocation.replace(path, ""));
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Get hdfs cluster address failed, 
please check.", e);
         }
-        return this.hiveSinkConfig;
+        super.prepare(pluginConfig);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
deleted file mode 100644
index 6532bad07..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkAggregatedCommitter.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.util.HdfsUtils;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class HiveSinkAggregatedCommitter implements 
SinkAggregatedCommitter<HiveCommitInfo, HiveAggregatedCommitInfo> {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(HiveSinkAggregatedCommitter.class);
-
-    @Override
-    public List<HiveAggregatedCommitInfo> 
commit(List<HiveAggregatedCommitInfo> aggregatedCommitInfoList) throws 
IOException {
-        LOGGER.info("=============================agg 
commit=================================");
-        if (CollectionUtils.isEmpty(aggregatedCommitInfoList)) {
-            return null;
-        }
-        List<HiveAggregatedCommitInfo> errorAggregatedCommitInfoList = new 
ArrayList<>();
-        HiveMetaStoreProxy hiveMetaStoreProxy = new 
HiveMetaStoreProxy(aggregatedCommitInfoList.get(0).getHiveMetastoreUris());
-        HiveMetaStoreClient hiveMetaStoreClient = 
hiveMetaStoreProxy.getHiveMetaStoreClient();
-        try {
-            aggregatedCommitInfoList.forEach(aggregateCommitInfo -> {
-                try {
-                    for (Map.Entry<String, Map<String, String>> entry : 
aggregateCommitInfo.getFileAggregatedCommitInfo().getTransactionMap().entrySet())
 {
-                        // rollback the file
-                        for (Map.Entry<String, String> mvFileEntry : 
entry.getValue().entrySet()) {
-                            HdfsUtils.renameFile(mvFileEntry.getKey(), 
mvFileEntry.getValue(), true);
-                        }
-                        // delete the transaction dir
-                        HdfsUtils.deleteFile(entry.getKey());
-                    }
-                    // add hive partition
-                    
aggregateCommitInfo.getFileAggregatedCommitInfo().getPartitionDirAndValsMap().entrySet().forEach(entry
 -> {
-                        Partition part = new Partition();
-                        
part.setDbName(aggregateCommitInfo.getTable().getDbName());
-                        
part.setTableName(aggregateCommitInfo.getTable().getTableName());
-                        part.setValues(entry.getValue());
-                        part.setParameters(new HashMap<>());
-                        
part.setSd(aggregateCommitInfo.getTable().getSd().deepCopy());
-                        
part.getSd().setSerdeInfo(aggregateCommitInfo.getTable().getSd().getSerdeInfo());
-                        
part.getSd().setLocation(aggregateCommitInfo.getTable().getSd().getLocation() + 
"/" + entry.getKey());
-                        try {
-                            hiveMetaStoreClient.add_partition(part);
-                        } catch (TException e) {
-                            throw new RuntimeException(e);
-                        }
-                    });
-                } catch (Exception e) {
-                    LOGGER.error("commit aggregateCommitInfo error ", e);
-                    errorAggregatedCommitInfoList.add(aggregateCommitInfo);
-                }
-            });
-        } finally {
-            hiveMetaStoreClient.close();
-        }
-
-        return errorAggregatedCommitInfoList;
-    }
-
-    @Override
-    public HiveAggregatedCommitInfo combine(List<HiveCommitInfo> commitInfos) {
-        if (CollectionUtils.isEmpty(commitInfos)) {
-            return null;
-        }
-        Map<String, Map<String, String>> aggregateCommitInfo = new HashMap<>();
-        Map<String, List<String>> partitionDirAndValsMap = new HashMap<>();
-        commitInfos.forEach(commitInfo -> {
-            Map<String, String> needMoveFileMap = 
aggregateCommitInfo.computeIfAbsent(commitInfo.getFileCommitInfo().getTransactionDir(),
 k -> new HashMap<>());
-            
needMoveFileMap.putAll(commitInfo.getFileCommitInfo().getNeedMoveFiles());
-            Set<Map.Entry<String, List<String>>> entries = 
commitInfo.getFileCommitInfo().getPartitionDirAndValsMap().entrySet();
-            if (!CollectionUtils.isEmpty(entries)) {
-                
partitionDirAndValsMap.putAll(commitInfo.getFileCommitInfo().getPartitionDirAndValsMap());
-            }
-        });
-        return new HiveAggregatedCommitInfo(
-            new FileAggregatedCommitInfo(aggregateCommitInfo, 
partitionDirAndValsMap),
-            commitInfos.get(0).getHiveMetastoreUris(),
-            commitInfos.get(0).getTable());
-    }
-
-    @Override
-    public void abort(List<HiveAggregatedCommitInfo> aggregatedCommitInfoList) 
throws Exception {
-        if (aggregatedCommitInfoList == null || 
aggregatedCommitInfoList.size() == 0) {
-            return;
-        }
-        aggregatedCommitInfoList.stream().forEach(aggregateCommitInfo -> {
-            try {
-                for (Map.Entry<String, Map<String, String>> entry : 
aggregateCommitInfo.getFileAggregatedCommitInfo().getTransactionMap().entrySet())
 {
-                    // rollback the file
-                    for (Map.Entry<String, String> mvFileEntry : 
entry.getValue().entrySet()) {
-                        if (HdfsUtils.fileExist(mvFileEntry.getValue()) && 
!HdfsUtils.fileExist(mvFileEntry.getKey())) {
-                            HdfsUtils.renameFile(mvFileEntry.getValue(), 
mvFileEntry.getKey(), true);
-                        }
-                    }
-                    // delete the transaction dir
-                    HdfsUtils.deleteFile(entry.getKey());
-
-                    // The partitions that have been added will be preserved 
and will not be deleted
-                }
-            } catch (IOException e) {
-                LOGGER.error("abort aggregateCommitInfo error ", e);
-            }
-        });
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
deleted file mode 100644
index b05b8d2e4..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfig.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FIELD_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.FILE_NAME_EXPRESSION;
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.IS_PARTITION_FIELD_WRITE_IN_FILE;
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.PATH;
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.ROW_DELIMITER;
-import static 
org.apache.seatunnel.connectors.seatunnel.file.config.Constant.SAVE_MODE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.Constant.HIVE_METASTORE_URIS;
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.Constant.HIVE_RESULT_TABLE_NAME;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.Constant;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.config.SaveMode;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
-import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreProxy;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
-import lombok.Data;
-import lombok.NonNull;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.thrift.TException;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-@Data
-public class HiveSinkConfig implements Serializable {
-    private static final String TEXT_FORMAT_CLASSNAME = 
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";
-    private static final String PARQUET_FORMAT_CLASSNAME = 
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
-    private static final String ORC_FORMAT_CLASSNAME = 
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
-    private String hiveTableName;
-    private List<String> hivePartitionFieldList;
-    private String hiveMetaUris;
-
-    private String dbName;
-
-    private String tableName;
-
-    private Table table;
-
-    private TextFileSinkConfig textFileSinkConfig;
-
-    public HiveSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType 
seaTunnelRowTypeInfo) {
-        
checkArgument(!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
-
-        if (config.hasPath(HIVE_RESULT_TABLE_NAME) && 
!StringUtils.isBlank(config.getString(HIVE_RESULT_TABLE_NAME))) {
-            this.hiveTableName = config.getString(HIVE_RESULT_TABLE_NAME);
-        }
-        checkNotNull(hiveTableName);
-
-        if (config.hasPath(HIVE_METASTORE_URIS) && 
!StringUtils.isBlank(config.getString(HIVE_METASTORE_URIS))) {
-            this.hiveMetaUris = config.getString(HIVE_METASTORE_URIS);
-        }
-        checkNotNull(hiveMetaUris);
-
-        String[] dbAndTableName = hiveTableName.split("\\.");
-        if (dbAndTableName == null || dbAndTableName.length != 2) {
-            throw new RuntimeException("Please config " + 
HIVE_RESULT_TABLE_NAME + " as db.table format");
-        }
-        this.dbName = dbAndTableName[0];
-        this.tableName = dbAndTableName[1];
-        HiveMetaStoreProxy hiveMetaStoreProxy = new 
HiveMetaStoreProxy(hiveMetaUris);
-        HiveMetaStoreClient hiveMetaStoreClient = 
hiveMetaStoreProxy.getHiveMetaStoreClient();
-
-        try {
-            table = hiveMetaStoreClient.getTable(dbName, tableName);
-            String outputFormat = table.getSd().getOutputFormat();
-            Map<String, String> parameters = 
table.getSd().getSerdeInfo().getParameters();
-            if (TEXT_FORMAT_CLASSNAME.equals(outputFormat)) {
-                config = config.withValue(FILE_FORMAT, 
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
-                        .withValue(FIELD_DELIMITER, 
ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
-                        .withValue(ROW_DELIMITER, 
ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
-            } else if (PARQUET_FORMAT_CLASSNAME.equals(outputFormat)) {
-                config = config.withValue(FILE_FORMAT, 
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
-            } else if (ORC_FORMAT_CLASSNAME.equals(outputFormat)) {
-                config = config.withValue(FILE_FORMAT, 
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
-            } else {
-                throw new RuntimeException("Only support [text parquet orc] 
file now");
-            }
-
-            config = config.withValue(IS_PARTITION_FIELD_WRITE_IN_FILE, 
ConfigValueFactory.fromAnyRef(false))
-                .withValue(FILE_NAME_EXPRESSION, 
ConfigValueFactory.fromAnyRef("${transactionId}"))
-                .withValue(PATH, 
ConfigValueFactory.fromAnyRef(table.getSd().getLocation()));
-
-            if (!config.hasPath(SAVE_MODE) || 
StringUtils.isBlank(config.getString(Constant.SAVE_MODE))) {
-                config = config.withValue(SAVE_MODE, 
ConfigValueFactory.fromAnyRef(SaveMode.APPEND.toString()));
-            }
-
-            this.textFileSinkConfig = new TextFileSinkConfig(config, 
seaTunnelRowTypeInfo);
-
-            // --------------------Check textFileSinkConfig with the hive 
table info-------------------
-            List<FieldSchema> fields = 
hiveMetaStoreClient.getFields(dbAndTableName[0], dbAndTableName[1]);
-            List<FieldSchema> partitionKeys = table.getPartitionKeys();
-
-            // Remove partitionKeys from table fields
-            List<FieldSchema> fieldNotContainPartitionKey = 
fields.stream().filter(filed -> 
!partitionKeys.contains(filed)).collect(Collectors.toList());
-
-            // check fields size must same as sinkColumnList size
-            if (fieldNotContainPartitionKey.size() != 
textFileSinkConfig.getSinkColumnList().size()) {
-                throw new RuntimeException("sink columns size must same as 
hive table field size");
-            }
-
-            // check hivePartitionFieldList size must same as 
partitionFieldList size
-            if (partitionKeys.size() != 
textFileSinkConfig.getPartitionFieldList().size()) {
-                throw new RuntimeException("partition by columns size must 
same as hive table partition columns size");
-            }
-
-            // --------------------Check textFileSinkConfig with the hive 
table info end----------------
-        } catch (TException e) {
-            throw new RuntimeException(e);
-        } finally {
-            hiveMetaStoreClient.close();
-        }
-
-        // hive only support append or overwrite
-        if (!this.textFileSinkConfig.getSaveMode().equals(SaveMode.APPEND) && 
!this.textFileSinkConfig.getSaveMode().equals(SaveMode.OVERWRITE)) {
-            throw new RuntimeException("hive only support append or overwrite 
save mode");
-        }
-    }
-
-    public TextFileSinkConfig getTextFileSinkConfig() {
-        return textFileSinkConfig;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
deleted file mode 100644
index a104151c3..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkState.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-@AllArgsConstructor
-public class HiveSinkState implements Serializable {
-    private String transactionId;
-    private Long checkpointId;
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
deleted file mode 100644
index 69283a55a..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkWriter.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.HdfsFileSinkPlugin;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileCommitInfo;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.FileSinkState;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkPartitionDirNameGenerator;
-import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import lombok.NonNull;
-import org.apache.commons.collections4.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-public class HiveSinkWriter implements SinkWriter<SeaTunnelRow, 
HiveCommitInfo, HiveSinkState> {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(HiveSinkWriter.class);
-
-    private SeaTunnelRowType seaTunnelRowTypeInfo;
-    private Config pluginConfig;
-    private Context context;
-    private String jobId;
-
-    private TransactionStateFileWriter fileWriter;
-
-    private HiveSinkConfig hiveSinkConfig;
-
-    public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
-                          @NonNull Config pluginConfig,
-                          @NonNull SinkWriter.Context context,
-                          @NonNull HiveSinkConfig hiveSinkConfig,
-                          @NonNull String jobId) {
-        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
-        this.pluginConfig = pluginConfig;
-        this.context = context;
-        this.jobId = jobId;
-        this.hiveSinkConfig = hiveSinkConfig;
-        this.fileWriter = createFileWriter();
-
-        fileWriter.beginTransaction(1L);
-    }
-
-    public HiveSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
-                          @NonNull Config pluginConfig,
-                          @NonNull SinkWriter.Context context,
-                          @NonNull HiveSinkConfig hiveSinkConfig,
-                          @NonNull String jobId,
-                          @NonNull List<HiveSinkState> hiveSinkStates) {
-        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
-        this.pluginConfig = pluginConfig;
-        this.context = context;
-        this.jobId = jobId;
-        this.hiveSinkConfig = hiveSinkConfig;
-        this.fileWriter = createFileWriter();
-
-        // Rollback dirty transaction
-        if (hiveSinkStates.size() > 0) {
-            List<String> transactionAfter = 
fileWriter.getTransactionAfter(hiveSinkStates.get(0).getTransactionId());
-            fileWriter.abortTransactions(transactionAfter);
-        }
-        fileWriter.beginTransaction(hiveSinkStates.get(0).getCheckpointId() + 
1);
-    }
-
-    @Override
-    public void write(SeaTunnelRow element) throws IOException {
-        fileWriter.write(element);
-    }
-
-    @Override
-    public Optional<HiveCommitInfo> prepareCommit() throws IOException {
-        Optional<FileCommitInfo> fileCommitInfoOptional = 
fileWriter.prepareCommit();
-        if (fileCommitInfoOptional.isPresent()) {
-            FileCommitInfo fileCommitInfo = fileCommitInfoOptional.get();
-            return Optional.of(new HiveCommitInfo(fileCommitInfo, 
hiveSinkConfig.getHiveMetaUris(), this.hiveSinkConfig.getTable()));
-        }
-        return Optional.empty();
-    }
-
-    @Override
-    public void close() throws IOException {
-        fileWriter.finishAndCloseWriteFile();
-    }
-
-    @Override
-    public List<HiveSinkState> snapshotState(long checkpointId) throws 
IOException {
-        List<FileSinkState> fileSinkStates = 
fileWriter.snapshotState(checkpointId);
-        if (!CollectionUtils.isEmpty(fileSinkStates)) {
-            return fileSinkStates.stream().map(state ->
-                    new HiveSinkState(state.getTransactionId(), 
state.getCheckpointId()))
-                .collect(Collectors.toList());
-        }
-        return Collections.emptyList();
-    }
-
-    @Override
-    public void abortPrepare() {
-        fileWriter.abortTransaction();
-    }
-
-    private TransactionStateFileWriter createFileWriter() {
-        SinkFileSystemPlugin sinkFileSystemPlugin = new HdfsFileSinkPlugin();
-        Optional<TransactionStateFileWriter> transactionStateFileWriterOpt = 
sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
-                getFilenameGenerator(),
-                getPartitionDirNameGenerator(),
-                
this.hiveSinkConfig.getTextFileSinkConfig().getSinkColumnsIndexInRow(),
-                this.hiveSinkConfig.getTextFileSinkConfig().getTmpPath(),
-                this.hiveSinkConfig.getTextFileSinkConfig().getPath(),
-                this.jobId,
-                this.context.getIndexOfSubtask(),
-                
this.hiveSinkConfig.getTextFileSinkConfig().getFieldDelimiter(),
-                this.hiveSinkConfig.getTextFileSinkConfig().getRowDelimiter(),
-                sinkFileSystemPlugin.getFileSystem().get());
-        if (!transactionStateFileWriterOpt.isPresent()) {
-            throw new RuntimeException("A TransactionStateFileWriter is need");
-        }
-        return transactionStateFileWriterOpt.get();
-    }
-
-    private FileSinkTransactionFileNameGenerator getFilenameGenerator() {
-        return new FileSinkTransactionFileNameGenerator(
-                this.hiveSinkConfig.getTextFileSinkConfig().getFileFormat(),
-                
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameExpression(),
-                
this.hiveSinkConfig.getTextFileSinkConfig().getFileNameTimeFormat());
-    }
-
-    private FileSinkPartitionDirNameGenerator getPartitionDirNameGenerator() {
-        return new FileSinkPartitionDirNameGenerator(
-                
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldList(),
-                
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionFieldsIndexInRow(),
-                
this.hiveSinkConfig.getTextFileSinkConfig().getPartitionDirExpression());
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HadoopConf.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HadoopConf.java
deleted file mode 100644
index 31dddf7b4..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HadoopConf.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class HadoopConf implements Serializable {
-
-    private String hdfsNameKey;
-
-    private String fsHdfsImpl = "org.apache.hadoop.hdfs.DistributedFileSystem";
-
-    public HadoopConf(String hdfsNameKey) {
-        this.hdfsNameKey = hdfsNameKey;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 1f1a8c3fb..92173892e 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -17,42 +17,34 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.source;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.SourceConfig.FILE_PATH;
-import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.ORC_INPUT_FORMAT_CLASSNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.PARQUET_INPUT_FORMAT_CLASSNAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig.TEXT_INPUT_FORMAT_CLASSNAME;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.hive.config.SourceConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategyFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.HdfsFileSource;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
 import com.google.auto.service.AutoService;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.metastore.api.Table;
 
-import java.io.IOException;
-import java.util.List;
+import java.net.URI;
+import java.net.URISyntaxException;
 
 @AutoService(SeaTunnelSource.class)
-public class HiveSource implements SeaTunnelSource<SeaTunnelRow, 
HiveSourceSplit, HiveSourceState> {
-
-    private SeaTunnelRowType typeInfo;
-
-    private ReadStrategy readStrategy;
-
-    private HadoopConf hadoopConf;
-
-    private List<String> filesPath;
+public class HiveSource extends HdfsFileSource {
+    private Table tableInformation;
 
     @Override
     public String getPluginName() {
@@ -60,51 +52,33 @@ public class HiveSource implements 
SeaTunnelSource<SeaTunnelRow, HiveSourceSplit
     }
 
     @Override
-    public void prepare(Config pluginConfig) {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
FILE_PATH, FS_DEFAULT_NAME_KEY);
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HiveConfig.METASTORE_URI, HiveConfig.TABLE_NAME);
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
         }
-        // use factory to generate readStrategy
-        readStrategy = 
ReadStrategyFactory.of(pluginConfig.getString(SourceConfig.FILE_TYPE));
-        String path = pluginConfig.getString(FILE_PATH);
-        hadoopConf = new 
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
-        try {
-            filesPath = readStrategy.getFileNamesByPath(hadoopConf, path);
-        } catch (IOException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
"Check file path fail.");
+        Pair<String[], Table> tableInfo = 
HiveConfig.getTableInfo(pluginConfig);
+        tableInformation = tableInfo.getRight();
+        String inputFormat = tableInformation.getSd().getInputFormat();
+        if (TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+            pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE, 
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
+        } else if (PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+            pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE, 
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
+        } else if (ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
+            pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE, 
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
+        } else {
+            throw new RuntimeException("Only support [text parquet orc] file 
now");
         }
+        String hdfsLocation = tableInformation.getSd().getLocation();
         try {
-            // should read from config or read from hive metadata( wait catlog 
done)
-            this.typeInfo = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filesPath.get(0));
-        } catch (HivePluginException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
"Read hive file type error.", e);
+            URI uri = new URI(hdfsLocation);
+            String path = uri.getPath();
+            String defaultFs = hdfsLocation.replace(path, "");
+            pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_PATH, 
ConfigValueFactory.fromAnyRef(path))
+                    .withValue(FS_DEFAULT_NAME_KEY, 
ConfigValueFactory.fromAnyRef(defaultFs));
+        } catch (URISyntaxException e) {
+            throw new RuntimeException("Get hdfs cluster address failed, 
please check.", e);
         }
+        super.prepare(pluginConfig);
     }
-
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return this.typeInfo;
-    }
-
-    @Override
-    public SourceReader<SeaTunnelRow, HiveSourceSplit> 
createReader(SourceReader.Context readerContext) throws Exception {
-        return new HiveSourceReader(this.readStrategy, this.hadoopConf, 
readerContext);
-    }
-
-    @Override
-    public Boundedness getBoundedness() {
-        return Boundedness.BOUNDED;
-    }
-
-    @Override
-    public SourceSplitEnumerator<HiveSourceSplit, HiveSourceState> 
createEnumerator(SourceSplitEnumerator.Context<HiveSourceSplit> 
enumeratorContext) throws Exception {
-        return new HiveSourceSplitEnumerator(enumeratorContext, filesPath);
-    }
-
-    @Override
-    public SourceSplitEnumerator<HiveSourceSplit, HiveSourceState> 
restoreEnumerator(SourceSplitEnumerator.Context<HiveSourceSplit> 
enumeratorContext, HiveSourceState checkpointState) throws Exception {
-        return new HiveSourceSplitEnumerator(enumeratorContext, filesPath, 
checkpointState);
-    }
-
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceReader.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceReader.java
deleted file mode 100644
index 570f48e35..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceReader.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategy;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-@Slf4j
-public class HiveSourceReader implements SourceReader<SeaTunnelRow, 
HiveSourceSplit> {
-
-    private static final long THREAD_WAIT_TIME = 500L;
-
-    private ReadStrategy readStrategy;
-
-    private HadoopConf hadoopConf;
-
-    private Set<HiveSourceSplit> sourceSplits;
-
-    private final SourceReader.Context context;
-
-    public HiveSourceReader(ReadStrategy readStrategy, HadoopConf hadoopConf, 
SourceReader.Context context) {
-        this.readStrategy = readStrategy;
-        this.hadoopConf = hadoopConf;
-        this.context = context;
-        this.sourceSplits = new HashSet<>();
-    }
-
-    @Override
-    public void open() {
-        readStrategy.init(hadoopConf);
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
-    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        if (sourceSplits.isEmpty()) {
-            Thread.sleep(THREAD_WAIT_TIME);
-            return;
-        }
-        sourceSplits.forEach(source -> {
-            try {
-                readStrategy.read(source.splitId(), output);
-            } catch (Exception e) {
-                throw new RuntimeException("Hive source read error", e);
-            }
-
-        });
-        context.signalNoMoreElement();
-    }
-
-    @Override
-    public List<HiveSourceSplit> snapshotState(long checkpointId) {
-        return new ArrayList<>(sourceSplits);
-    }
-
-    @Override
-    public void addSplits(List<HiveSourceSplit> splits) {
-        sourceSplits.addAll(splits);
-    }
-
-    @Override
-    public void handleNoMoreSplits() {
-
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) {
-
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplit.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplit.java
deleted file mode 100644
index 44e062e54..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplit.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import org.apache.seatunnel.api.source.SourceSplit;
-
-public class HiveSourceSplit implements SourceSplit {
-
-    private static final long serialVersionUID = -1L;
-
-    private String splitId;
-
-    public HiveSourceSplit(String splitId) {
-        this.splitId = splitId;
-    }
-
-    @Override
-    public String splitId() {
-        return this.splitId;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplitEnumerator.java
deleted file mode 100644
index 301b1506f..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceSplitEnumerator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.common.config.Common;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class HiveSourceSplitEnumerator implements 
SourceSplitEnumerator<HiveSourceSplit, HiveSourceState> {
-
-    private final Context<HiveSourceSplit> context;
-    private Set<HiveSourceSplit> pendingSplit;
-    private Set<HiveSourceSplit> assignedSplit;
-    private List<String> filePaths;
-
-    public 
HiveSourceSplitEnumerator(SourceSplitEnumerator.Context<HiveSourceSplit> 
context, List<String> filePaths) {
-        this.context = context;
-        this.filePaths = filePaths;
-    }
-
-    public 
HiveSourceSplitEnumerator(SourceSplitEnumerator.Context<HiveSourceSplit> 
context, List<String> filePaths,
-                                     HiveSourceState sourceState) {
-        this(context, filePaths);
-        this.assignedSplit = sourceState.getAssignedSplit();
-    }
-
-    @Override
-    public void open() {
-        this.assignedSplit = new HashSet<>();
-        this.pendingSplit = new HashSet<>();
-    }
-
-    @Override
-    public void run() {
-        pendingSplit = getHiveFileSplit();
-        assignSplit(context.registeredReaders());
-    }
-
-    private Set<HiveSourceSplit> getHiveFileSplit() {
-        Set<HiveSourceSplit> hiveSourceSplits = new HashSet<>();
-        filePaths.forEach(k -> hiveSourceSplits.add(new HiveSourceSplit(k)));
-        return hiveSourceSplits;
-
-    }
-
-    @Override
-    public void close() throws IOException {
-
-    }
-
-    @Override
-    public void addSplitsBack(List<HiveSourceSplit> splits, int subtaskId) {
-        if (!splits.isEmpty()) {
-            pendingSplit.addAll(splits);
-            assignSplit(Collections.singletonList(subtaskId));
-        }
-    }
-
-    private void assignSplit(Collection<Integer> taskIDList) {
-        Map<Integer, List<HiveSourceSplit>> readySplit = new 
HashMap<>(Common.COLLECTION_SIZE);
-        for (int taskID : taskIDList) {
-            readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
-        }
-
-        pendingSplit.forEach(s -> readySplit.get(getSplitOwner(s.splitId(), 
taskIDList.size()))
-                .add(s));
-        readySplit.forEach(context::assignSplit);
-        assignedSplit.addAll(pendingSplit);
-        pendingSplit.clear();
-    }
-
-    private static int getSplitOwner(String tp, int numReaders) {
-        return tp.hashCode() % numReaders;
-    }
-
-    @Override
-    public int currentUnassignedSplitSize() {
-        return pendingSplit.size();
-    }
-
-    @Override
-    public void registerReader(int subtaskId) {
-        if (!pendingSplit.isEmpty()) {
-            assignSplit(Collections.singletonList(subtaskId));
-        }
-    }
-
-    @Override
-    public HiveSourceState snapshotState(long checkpointId) {
-        return new HiveSourceState(assignedSplit);
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) {
-
-    }
-
-    @Override
-    public void handleSplitRequest(int subtaskId) {
-
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceState.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceState.java
deleted file mode 100644
index f982a71cb..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceState.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source;
-
-import java.io.Serializable;
-import java.util.Set;
-
-public class HiveSourceState implements Serializable {
-
-
-    private Set<HiveSourceSplit> assignedSplit;
-
-    public HiveSourceState(Set<HiveSourceSplit> assignedSplit) {
-        this.assignedSplit = assignedSplit;
-    }
-
-    public Set<HiveSourceSplit> getAssignedSplit() {
-        return assignedSplit;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/AbstractReadStrategy.java
deleted file mode 100644
index 2df0a21f7..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/AbstractReadStrategy.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
-
-import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-public abstract class AbstractReadStrategy implements ReadStrategy {
-
-    HadoopConf hadoopConf;
-
-    @Override
-    public void init(HadoopConf conf) {
-        this.hadoopConf = conf;
-    }
-
-    @Override
-    public Configuration getConfiguration(HadoopConf hadoopConf) {
-        Configuration configuration = new Configuration();
-        configuration.set(FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
-        configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
-        return configuration;
-    }
-
-    Configuration getConfiguration() throws HivePluginException {
-        if (null == hadoopConf) {
-            throw new HivePluginException("Not init read config");
-        }
-        return getConfiguration(hadoopConf);
-    }
-
-    boolean checkFileType(String path) {
-        return true;
-    }
-
-    @Override
-    public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) 
throws IOException {
-        Configuration configuration = getConfiguration(hadoopConf);
-        List<String> fileNames = new ArrayList<>();
-        FileSystem hdfs = FileSystem.get(configuration);
-        Path listFiles = new Path(path);
-        FileStatus[] stats = hdfs.listStatus(listFiles);
-        for (FileStatus fileStatus : stats) {
-            if (fileStatus.isDirectory()) {
-                fileNames.addAll(getFileNamesByPath(hadoopConf, 
fileStatus.getPath().toString()));
-                continue;
-            }
-            if (fileStatus.isFile()) {
-                // filter '_SUCCESS' file
-                if (!fileStatus.getPath().getName().equals("_SUCCESS")) {
-                    fileNames.add(fileStatus.getPath().toString());
-                }
-            }
-        }
-        return fileNames;
-    }
-
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
deleted file mode 100644
index 96ff1e983..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/OrcReadStrategy.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
-import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.orc.Reader;
-import org.apache.orc.TypeDescription;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Properties;
-
-@Slf4j
-public class OrcReadStrategy extends AbstractReadStrategy {
-
-    private SeaTunnelRowType seaTunnelRowTypeInfo;
-    private static final long MIN_SIZE = 16 * 1024;
-
-    @Override
-    public void read(String path, Collector<SeaTunnelRow> output) throws 
Exception {
-        if (Boolean.FALSE.equals(checkFileType(path))) {
-            throw new Exception("please check file type");
-        }
-        JobConf conf = new JobConf();
-        Path filePath = new Path(path);
-        Properties p = new Properties();
-        OrcSerde serde = new OrcSerde();
-        String columns = String.join(",", 
seaTunnelRowTypeInfo.getFieldNames());
-        p.setProperty("columns", columns);
-        //support types
-        serde.initialize(conf, p);
-        StructObjectInspector inspector = (StructObjectInspector) 
serde.getObjectInspector();
-        InputFormat<NullWritable, OrcStruct> in = new OrcInputFormat();
-        FileInputFormat.setInputPaths(conf, filePath);
-        InputSplit[] splits = in.getSplits(conf, 1);
-
-        conf.set("hive.io.file.readcolumn.ids", "1");
-        RecordReader<NullWritable, OrcStruct> reader = 
in.getRecordReader(splits[0], conf, Reporter.NULL);
-        NullWritable key = reader.createKey();
-        OrcStruct value = reader.createValue();
-        List<? extends StructField> fields = inspector.getAllStructFieldRefs();
-        while (reader.next(key, value)) {
-            Object[] datas = new Object[fields.size()];
-            for (int i = 0; i < fields.size(); i++) {
-                Object data = inspector.getStructFieldData(value, 
fields.get(i));
-                if (null != data) {
-                    datas[i] = String.valueOf(data);
-                } else {
-                    datas[i] = null;
-                }
-            }
-            output.collect(new SeaTunnelRow(datas));
-        }
-        reader.close();
-    }
-
-    @Override
-    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, 
String path) throws HivePluginException {
-
-        if (null != seaTunnelRowTypeInfo) {
-            return seaTunnelRowTypeInfo;
-        }
-        Configuration configuration = getConfiguration(hadoopConf);
-        Path dstDir = new Path(path);
-        Reader reader;
-        try {
-            reader = OrcFile.createReader(FileSystem.get(configuration), 
dstDir);
-        } catch (IOException e) {
-            throw new HivePluginException("Create OrcReader Fail", e);
-        }
-
-        TypeDescription schema = reader.getSchema();
-        String[] fields = new String[schema.getFieldNames().size()];
-        SeaTunnelDataType[] types = new 
SeaTunnelDataType[schema.getFieldNames().size()];
-
-        for (int i = 0; i < schema.getFieldNames().size(); i++) {
-            fields[i] = schema.getFieldNames().get(i);
-            types[i] = BasicType.STRING_TYPE;
-        }
-        seaTunnelRowTypeInfo = new SeaTunnelRowType(fields, types);
-        return seaTunnelRowTypeInfo;
-    }
-
-    @SuppressWarnings("checkstyle:MagicNumber")
-    @Override
-    boolean checkFileType(String path) {
-        try {
-            boolean checkResult;
-            Configuration configuration = getConfiguration();
-            FileSystem fileSystem = FileSystem.get(configuration);
-            Path filePath = new Path(path);
-            FSDataInputStream in = fileSystem.open(filePath);
-            // try to get Postscript in orc file
-            long size = fileSystem.getFileStatus(filePath).getLen();
-            int readSize = (int) Math.min(size, MIN_SIZE);
-            in.seek(size - readSize);
-            ByteBuffer buffer = ByteBuffer.allocate(readSize);
-            in.readFully(buffer.array(), buffer.arrayOffset() + 
buffer.position(), buffer.remaining());
-            int psLen = buffer.get(readSize - 1) & 0xff;
-            int len = OrcFile.MAGIC.length();
-            if (psLen < len + 1) {
-                in.close();
-                return false;
-            }
-            int offset = buffer.arrayOffset() + buffer.position() + 
buffer.limit() - 1 - len;
-            byte[] array = buffer.array();
-            if (Text.decode(array, offset, len).equals(OrcFile.MAGIC)) {
-                checkResult = true;
-            } else {
-                // If it isn't there, this may be the 0.11.0 version of ORC.
-                // Read the first 3 bytes of the file to check for the header
-                in.seek(0);
-                byte[] header = new byte[len];
-                in.readFully(header, 0, len);
-                // if it isn't there, this isn't an ORC file
-                checkResult = Text.decode(header, 0, 
len).equals(OrcFile.MAGIC);
-            }
-            in.close();
-            return checkResult;
-        } catch (HivePluginException | IOException e) {
-            String errorMsg = String.format("Check orc file [%s] error", path);
-            throw new RuntimeException(errorMsg, e);
-        }
-    }
-}
-
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
deleted file mode 100644
index 35d70e3ea..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ParquetReadStrategy.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroParquetReader;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetReader;
-import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.MessageType;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class ParquetReadStrategy extends AbstractReadStrategy {
-
-    private SeaTunnelRowType seaTunnelRowType;
-
-    private static final byte[] PARQUET_MAGIC = new byte[]{(byte) 'P', (byte) 
'A', (byte) 'R', (byte) '1'};
-
-    @Override
-    public void read(String path, Collector<SeaTunnelRow> output) throws 
Exception {
-        if (Boolean.FALSE.equals(checkFileType(path))) {
-            throw new Exception("please check file type");
-        }
-        Path filePath = new Path(path);
-        HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, 
getConfiguration());
-        int fieldsCount = seaTunnelRowType.getTotalFields();
-        GenericRecord record;
-        try (ParquetReader<GenericData.Record> reader = 
AvroParquetReader.<GenericData.Record>builder(hadoopInputFile).build()) {
-            while ((record = reader.read()) != null) {
-                Object[] fields = new Object[fieldsCount];
-                for (int i = 0; i < fieldsCount; i++) {
-                    Object data = record.get(i);
-                    try {
-                        if (data instanceof GenericData.Fixed) {
-                            // judge the data in upstream is or not decimal 
type
-                            data = fixed2String((GenericData.Fixed) data);
-                        } else if (data instanceof ArrayList) {
-                            // judge the data in upstream is or not array type
-                            data = 
array2String((ArrayList<GenericData.Record>) data);
-                        }
-                    } catch (Exception e) {
-                        data = record.get(i);
-                    } finally {
-                        fields[i] = data.toString();
-                    }
-                }
-                output.collect(new SeaTunnelRow(fields));
-            }
-        }
-    }
-
-    @Override
-    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, 
String path) throws HivePluginException {
-        if (seaTunnelRowType != null) {
-            return seaTunnelRowType;
-        }
-        Configuration configuration = getConfiguration(hadoopConf);
-        Path filePath = new Path(path);
-        ParquetMetadata metadata;
-        try {
-            metadata = ParquetFileReader.readFooter(configuration, filePath);
-        } catch (IOException e) {
-            throw new HivePluginException("Create parquet reader failed", e);
-        }
-        FileMetaData fileMetaData = metadata.getFileMetaData();
-        MessageType schema = fileMetaData.getSchema();
-        int fieldCount = schema.getFieldCount();
-        String[] fields = new String[fieldCount];
-        SeaTunnelDataType[] types = new SeaTunnelDataType[fieldCount];
-        for (int i = 0; i < fieldCount; i++) {
-            fields[i] = schema.getFieldName(i);
-            // Temporarily each field is treated as a string type
-            // I think we can use the schema information to build seatunnel 
column type
-            types[i] = BasicType.STRING_TYPE;
-        }
-        seaTunnelRowType = new SeaTunnelRowType(fields, types);
-        return seaTunnelRowType;
-    }
-
-    private String fixed2String(GenericData.Fixed fixed) {
-        Schema schema = fixed.getSchema();
-        byte[] bytes = fixed.bytes();
-        int precision = 
Integer.parseInt(schema.getObjectProps().get("precision").toString());
-        int scale = 
Integer.parseInt(schema.getObjectProps().get("scale").toString());
-        BigDecimal bigDecimal = bytes2Decimal(bytes, precision, scale);
-        return bigDecimal.toString();
-    }
-
-    @SuppressWarnings("checkstyle:MagicNumber")
-    private BigDecimal bytes2Decimal(byte[] bytesArray, int precision, int 
scale) {
-        Binary value = Binary.fromConstantByteArray(bytesArray);
-        if (precision <= 18) {
-            ByteBuffer buffer = value.toByteBuffer();
-            byte[] bytes = buffer.array();
-            int start = buffer.arrayOffset() + buffer.position();
-            int end = buffer.arrayOffset() + buffer.limit();
-            long unscaled = 0L;
-            int i = start;
-            while (i < end) {
-                unscaled = unscaled << 8 | bytes[i] & 0xff;
-                i++;
-            }
-            int bits = 8 * (end - start);
-            long unscaledNew = (unscaled << (64 - bits)) >> (64 - bits);
-            if (unscaledNew <= -Math.pow(10, 18) || unscaledNew >= 
Math.pow(10, 18)) {
-                return new BigDecimal(unscaledNew);
-            } else {
-                return BigDecimal.valueOf(unscaledNew / Math.pow(10, scale));
-            }
-        } else {
-            return new BigDecimal(new BigInteger(value.getBytes()), scale);
-        }
-    }
-
-    @Override
-    boolean checkFileType(String path) {
-        boolean checkResult;
-        byte[] magic = new byte[PARQUET_MAGIC.length];
-        try {
-            Configuration configuration = getConfiguration();
-            FileSystem fileSystem = FileSystem.get(configuration);
-            Path filePath = new Path(path);
-            FSDataInputStream in = fileSystem.open(filePath);
-            // try to get header information in a parquet file
-            in.seek(0);
-            in.readFully(magic);
-            checkResult = Arrays.equals(magic, PARQUET_MAGIC);
-            in.close();
-            return checkResult;
-        } catch (HivePluginException | IOException e) {
-            String errorMsg = String.format("Check parquet file [%s] error", 
path);
-            throw new RuntimeException(errorMsg, e);
-        }
-    }
-
-    private String array2String(ArrayList<GenericData.Record> data) throws 
JsonProcessingException {
-        ObjectMapper objectMapper = new ObjectMapper();
-        List<String> values = data.stream().map(record -> 
record.get(0).toString()).collect(Collectors.toList());
-        return objectMapper.writeValueAsString(values);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategy.java
deleted file mode 100644
index 5e7301914..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategy.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
-
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-public interface ReadStrategy extends Serializable {
-    void init(HadoopConf conf);
-
-    Configuration getConfiguration(HadoopConf conf);
-
-    void read(String path, Collector<SeaTunnelRow> output) throws Exception;
-
-    SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String 
path) throws HivePluginException;
-
-    List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws 
IOException;
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java
deleted file mode 100644
index 56e88aa44..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/ReadStrategyFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.type.FileTypeEnum;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class ReadStrategyFactory {
-
-    private ReadStrategyFactory() {}
-
-    public static ReadStrategy of(String fileType) {
-        try {
-            FileTypeEnum fileTypeEnum = 
FileTypeEnum.valueOf(fileType.toUpperCase());
-            return fileTypeEnum.getReadStrategy();
-        } catch (IllegalArgumentException e) {
-            log.warn("Hive plugin not support this file type [{}], it will be 
treated as a plain text file", fileType);
-            return new TextReadStrategy();
-        }
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/TextReadStrategy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/TextReadStrategy.java
deleted file mode 100644
index 6b014d737..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/format/TextReadStrategy.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HivePluginException;
-import org.apache.seatunnel.connectors.seatunnel.hive.source.HadoopConf;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-
-public class TextReadStrategy extends AbstractReadStrategy {
-
-    private static final String TEXT_FIELD_NAME = "lines";
-
-    @Override
-    public void read(String path, Collector<SeaTunnelRow> output) throws 
IOException, HivePluginException {
-        Configuration conf = getConfiguration();
-        FileSystem fs = FileSystem.get(conf);
-        Path filePath = new Path(path);
-        try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
-            reader.lines().forEach(line -> output.collect(new SeaTunnelRow(new 
String[]{line})));
-        }
-    }
-
-    @Override
-    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, 
String path) {
-        return new SeaTunnelRowType(new String[]{TEXT_FIELD_NAME},
-                new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java
deleted file mode 100644
index 3cf986b18..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/file/reader/type/FileTypeEnum.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.type;
-
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.OrcReadStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ParquetReadStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.ReadStrategy;
-import 
org.apache.seatunnel.connectors.seatunnel.hive.source.file.reader.format.TextReadStrategy;
-
-public enum FileTypeEnum {
-    ORC {
-        @Override
-        public ReadStrategy getReadStrategy() {
-            return new OrcReadStrategy();
-        }
-    },
-    PARQUET {
-        @Override
-        public ReadStrategy getReadStrategy() {
-            return new ParquetReadStrategy();
-        }
-    },
-    TEXT {
-        @Override
-        public ReadStrategy getReadStrategy() {
-            return new TextReadStrategy();
-        }
-    };
-
-    public ReadStrategy getReadStrategy() {
-        return null;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index 30c9a2eba..d813a03a3 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -17,18 +17,26 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.utils;
 
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import lombok.NonNull;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.thrift.TException;
 
-public class HiveMetaStoreProxy {
+import java.util.List;
+import java.util.Objects;
 
-    private HiveMetaStoreClient hiveMetaStoreClient;
+public class HiveMetaStoreProxy {
+    private final HiveMetaStoreClient hiveMetaStoreClient;
+    private static volatile HiveMetaStoreProxy INSTANCE = null;
 
-    public HiveMetaStoreProxy(@NonNull String uris) {
+    private HiveMetaStoreProxy(@NonNull String uris) {
         HiveConf hiveConf = new HiveConf();
         hiveConf.set("hive.metastore.uris", uris);
         try {
@@ -38,15 +46,40 @@ public class HiveMetaStoreProxy {
         }
     }
 
+    public static HiveMetaStoreProxy getInstance(Config config) {
+        if (INSTANCE == null) {
+            synchronized (HiveMetaStoreProxy.class) {
+                if (INSTANCE == null) {
+                    String metastoreUri = 
config.getString(HiveConfig.METASTORE_URI);
+                    INSTANCE = new HiveMetaStoreProxy(metastoreUri);
+                }
+            }
+        }
+        return INSTANCE;
+    }
+
     public Table getTable(@NonNull String dbName, @NonNull String tableName) {
         try {
             return hiveMetaStoreClient.getTable(dbName, tableName);
         } catch (TException e) {
-            throw new RuntimeException(e);
+            String errorMsg = String.format("Get table [%s.%s] information 
failed", dbName, tableName);
+            throw new RuntimeException(errorMsg, e);
         }
     }
 
-    public HiveMetaStoreClient getHiveMetaStoreClient() {
-        return hiveMetaStoreClient;
+    public List<FieldSchema> getTableFields(@NonNull String dbName, @NonNull 
String tableName) {
+        try {
+            return hiveMetaStoreClient.getFields(dbName, tableName);
+        } catch (TException e) {
+            String errorMsg = String.format("Get table [%s.%s] fields 
information failed", dbName, tableName);
+            throw new RuntimeException(errorMsg, e);
+        }
+    }
+
+    public synchronized void close() {
+        if (Objects.nonNull(hiveMetaStoreClient)) {
+            hiveMetaStoreClient.close();
+            HiveMetaStoreProxy.INSTANCE = null;
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfigTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfigTest.java
deleted file mode 100644
index e822a49c6..000000000
--- 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSinkConfigTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
-
-import java.io.File;
-import java.util.List;
-
-public class HiveSinkConfigTest {
-
-    /**
-     * test hive sink config.
-     * <p> TODO: Uncouple from the hive environment
-     */
-    public void testCreateHiveSinkConfig() {
-        String[] fieldNames = new String[]{"name", "age"};
-        SeaTunnelDataType[] seaTunnelDataTypes = new 
SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.INT_TYPE};
-        SeaTunnelRowType seaTunnelRowTypeInfo = new 
SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
-        String configFile = "fakesource_to_hive.conf";
-        String configFilePath = System.getProperty("user.dir") + 
"/src/test/resources/" + configFile;
-        Config config = ConfigFactory
-            .parseFile(new File(configFilePath))
-            .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-            .resolveWith(ConfigFactory.systemProperties(),
-                ConfigResolveOptions.defaults().setAllowUnresolved(true));
-        List<? extends Config> sink = config.getConfigList("sink");
-        HiveSinkConfig hiveSinkConfig = new HiveSinkConfig(sink.get(0), 
seaTunnelRowTypeInfo);
-    }
-}

Reply via email to