This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a64e177d06 [Feature][Hive JDBC Source] Support Hive JDBC Source 
Connector (#5424)
a64e177d06 is described below

commit a64e177d063f94842c54b563b64e3509663e7bf1
Author: Nick <[email protected]>
AuthorDate: Tue Nov 7 13:42:48 2023 +0800

    [Feature][Hive JDBC Source] Support Hive JDBC Source Connector (#5424)
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |   2 +
 docs/en/connector-v2/source/Hive-jdbc.md           | 164 +++++++++++++++++++
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  17 ++
 .../jdbc/config/JdbcConnectionConfig.java          |  43 ++++-
 .../seatunnel/jdbc/config/JdbcOptions.java         |  27 ++++
 .../jdbc/exception/JdbcConnectorErrorCode.java     |   5 +-
 .../connection/SimpleJdbcConnectionProvider.java   |  12 +-
 .../jdbc/internal/dialect/JdbcDialect.java         |   8 +
 .../jdbc/internal/dialect/hive/HiveDialect.java    |  66 ++++++++
 .../internal/dialect/hive/HiveDialectFactory.java  |  38 +++++
 .../dialect/hive/HiveJdbcConnectionProvider.java   |  79 ++++++++++
 .../dialect/hive/HiveJdbcRowConverter.java         |  43 +++++
 .../jdbc/internal/dialect/hive/HiveTypeMapper.java | 120 ++++++++++++++
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java        |   3 +-
 .../seatunnel/jdbc/source/ChunkSplitter.java       |   5 +-
 .../seatunnel/jdbc/utils/HiveJdbcUtils.java        |  68 ++++++++
 .../seatunnel/jdbc/utils/JdbcCatalogUtils.java     |   8 +-
 .../connector-jdbc-e2e-part-3/pom.xml              |   5 +
 .../connectors/seatunnel/jdbc/JdbcHiveIT.java      | 173 +++++++++++++++++++++
 .../resources/jdbc_hive_source_and_assert.conf     | 141 +++++++++++++++++
 20 files changed, 1013 insertions(+), 14 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 03c9412424..b7e59b74cf 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -194,6 +194,8 @@ problems encountered by users.
 | JDBC-04 | Connector database failed                                      | 
When users encounter this error code, it means that database connection 
failure, check whether the url is correct or whether the corresponding service 
is normal                                            |
 | JDBC-05 | transaction operation failed, such as (commit, rollback) etc.. | 
When users encounter this error code, it means that if a sql transaction fails, 
check the transaction execution of the corresponding database to determine the 
cause of the transaction failure             |
 | JDBC-06 | No suitable dialect factory found                              | 
When users encounter this error code, it means that may be an unsupported 
dialect type                                                                    
                                                  |
+| JDBC-07 | The jdbc type don't support sink                               | 
When users encounter this error code, it means that jdbc type don't support 
sink                                                                            
                                                |
+| JDBC-08 | Kerberos authentication failed                                 | 
When users encounter this error code, it means that database connection 
Kerberos authentication failed                                                  
                                                    |
 
 ## Pulsar Connector Error Codes
 
diff --git a/docs/en/connector-v2/source/Hive-jdbc.md 
b/docs/en/connector-v2/source/Hive-jdbc.md
new file mode 100644
index 0000000000..9b4c4e1102
--- /dev/null
+++ b/docs/en/connector-v2/source/Hive-jdbc.md
@@ -0,0 +1,164 @@
+# Hive
+
+> JDBC Hive Source Connector
+
+## Support Hive Version
+
+- Definitely supports 3.1.3 and 3.1.2, other versions need to be tested.
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+> supports query SQL and can achieve projection effect.
+
+## Description
+
+Read external data source data through JDBC.
+
+## Supported DataSource Info
+
+| Datasource |                    Supported versions                    |      
       Driver              |                 Url                  |             
                     Maven                                   |
+|------------|----------------------------------------------------------|---------------------------------|--------------------------------------|--------------------------------------------------------------------------|
+| Hive       | Different dependency version has different driver class. | 
org.apache.hive.jdbc.HiveDriver | jdbc:hive2://localhost:10000/default | 
[Download](https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc) |
+
+## Database Dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the 
'$SEATNUNNEL_HOME/plugins/jdbc/lib/'
+> working directory<br/>
+> For example Hive datasource: cp hive-jdbc-xxx.jar 
$SEATNUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+|                                      Hive Data type                          
             | SeaTunnel Data type |
+|-------------------------------------------------------------------------------------------|---------------------|
+| BOOLEAN                                                                      
             | BOOLEAN             |
+| TINYINT<br/> SMALLINT                                                        
             | SHORT               |
+| INT<br/>INTEGER                                                              
             | INT                 |
+| BIGINT                                                                       
             | LONG                |
+| FLOAT                                                                        
             | FLOAT               |
+| DOUBLE<br/>DOUBLE PRECISION                                                  
             | DOUBLE              |
+| DECIMAL(x,y)<br/>NUMERIC(x,y)<br/>(Get the designated column's specified 
column size.<38) | DECIMAL(x,y)        |
+| DECIMAL(x,y)<br/>NUMERIC(x,y)<br/>(Get the designated column's specified 
column size.>38) | DECIMAL(38,18)      |
+| CHAR<br/>VARCHAR<br/>STRING                                                  
             | STRING              |
+| DATE                                                                         
             | DATE                |
+| DATETIME<br/>TIMESTAMP                                                       
             | TIMESTAMP           |
+| BINARY<br/>  ARRAY <br/>INTERVAL <br/>MAP   <br/>STRUCT<br/>UNIONTYPE        
             | Not supported yet   |
+
+## Source Options
+
+|             Name             |    Type    | Required |     Default     |     
                                                                                
                                       Description                              
                                                                                
              |
+|------------------------------|------------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url                          | String     | Yes      | -               | The 
URL of the JDBC connection. Refer to a case: 
jdbc:hive2://localhost:10000/default                                            
                                                                                
                                                 |
+| driver                       | String     | Yes      | -               | The 
jdbc class name used to connect to the remote data source,<br/> if you use Hive 
the value is `org.apache.hive.jdbc.HiveDriver`.                                 
                                                                                
              |
+| user                         | String     | No       | -               | 
Connection instance user name                                                   
                                                                                
                                                                                
                  |
+| password                     | String     | No       | -               | 
Connection instance password                                                    
                                                                                
                                                                                
                  |
+| query                        | String     | Yes      | -               | 
Query statement                                                                 
                                                                                
                                                                                
                  |
+| connection_check_timeout_sec | Int        | No       | 30              | The 
time in seconds to wait for the database operation used to validate the 
connection to complete                                                          
                                                                                
                      |
+| partition_column             | String     | No       | -               | The 
column name for parallelism's partition, only support numeric type,Only support 
numeric type primary key, and only can config one column.                       
                                                                                
              |
+| partition_lower_bound        | BigDecimal | No       | -               | The 
partition_column min value for scan, if not set SeaTunnel will query database 
get min value.                                                                  
                                                                                
                |
+| partition_upper_bound        | BigDecimal | No       | -               | The 
partition_column max value for scan, if not set SeaTunnel will query database 
get max value.                                                                  
                                                                                
                |
+| partition_num                | Int        | No       | job parallelism | The 
number of partition count, only support positive integer. default value is job 
parallelism                                                                     
                                                                                
               |
+| fetch_size                   | Int        | No       | 0               | For 
queries that return a large number of objects,you can configure<br/> the row 
fetch size used in the query toimprove performance by<br/> reducing the number 
database hits required to satisfy the selection criteria.<br/> Zero means use 
jdbc default value. |
+| common-options               |            | No       | -               | 
Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                  |
+| useKerberos                  | Boolean    | No       | no              | 
Whether to enable Kerberos, default is false                                    
                                                                                
                                                                                
                  |
+| kerberos_principal           | String     | No       | -               | 
When use kerberos, we should set kerberos principal such as 'test_user@xxx'.    
                                                                                
                                                                                
                  |
+| kerberos_keytab_path         | String     | No       | -               | 
When use kerberos, we should set kerberos principal file path such as 
'/home/test/test_user.keytab' .                                                 
                                                                                
                            |
+| krb5_path                    | String     | No       | /etc/krb5.conf  | 
When use kerberos, we should set krb5 path file path such as 
'/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf '.               
                                                                                
                                     |
+
+### Tips
+
+> If partition_column is not set, it will run in single concurrency, and if 
partition_column is set, it will be executed
+> in parallel according to the concurrency of tasks , When your shard read 
field is a large number type such as bigint(
+> and above and the data is not evenly distributed, it is recommended to set 
the parallelism level to 1 to ensure that
+> the
+> data skew problem is resolved
+
+## Task Example
+
+### Simple:
+
+> This example queries type_bin 'table' 16 data in your test "database" in 
single parallel and queries all of its
+> fields. You can also specify which fields to query for final output to the 
console.
+
+```
+# Defining the runtime environment
+env {
+  # You can set flink configuration here
+  execution.parallelism = 2
+  job.mode = "BATCH"
+}
+source{
+    Jdbc {
+        url = "jdbc:hive2://localhost:10000/default"
+        driver = "org.apache.hive.jdbc.HiveDriver"
+        connection_check_timeout_sec = 100
+        query = "select * from type_bin limit 16"
+    }
+}
+
+transform {
+    # If you would like to get more information about how to configure 
seatunnel and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/transform-v2/sql
+}
+
+sink {
+    Console {}
+}
+```
+
+### Parallel:
+
+> Read your query table in parallel with the shard field you configured and 
the shard data You can do this if you want
+> to read the whole table
+
+```
+source {
+    Jdbc {
+        url = "jdbc:hive2://localhost:10000/default"
+        driver = "org.apache.hive.jdbc.HiveDriver"
+        connection_check_timeout_sec = 100
+        # Define query logic as required
+        query = "select * from type_bin"
+        # Parallel sharding reads fields
+        partition_column = "id"
+        # Number of fragments
+        partition_num = 10
+    }
+}
+```
+
+### Parallel Boundary:
+
+> It is more efficient to specify the data within the upper and lower bounds 
of the query It is more efficient to read
+> your data source according to the upper and lower boundaries you configured
+
+```
+source {
+    Jdbc {
+        url = "jdbc:hive2://localhost:10000/default"
+        driver = "org.apache.hive.jdbc.HiveDriver"
+        connection_check_timeout_sec = 100
+        # Define query logic as required
+        query = "select * from type_bin"
+        partition_column = "id"
+        # Read start boundary
+        partition_lower_bound = 1
+        # Read end boundary
+        partition_upper_bound = 500
+        partition_num = 10
+    }
+}
+```
+
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 2d081fa583..6c64c9c667 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -48,6 +48,7 @@
         <hikari.version>4.0.3</hikari.version>
         <postgis.jdbc.version>2.5.1</postgis.jdbc.version>
         <kingbase8.version>8.6.0</kingbase8.version>
+        <hive.jdbc.version>3.1.3</hive.jdbc.version>
         <oceanbase.jdbc.version>2.4.3</oceanbase.jdbc.version>
     </properties>
 
@@ -169,6 +170,18 @@
                 <version>${kingbase8.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>org.apache.hive</groupId>
+                <artifactId>hive-jdbc</artifactId>
+                <version>${hive.jdbc.version}</version>
+                <scope>provided</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>jdk.tools</groupId>
+                        <artifactId>jdk.tools</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
             <dependency>
                 <groupId>com.oceanbase</groupId>
                 <artifactId>oceanbase-client</artifactId>
@@ -269,6 +282,10 @@
             <groupId>cn.com.kingbase</groupId>
             <artifactId>kingbase8</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-jdbc</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.oceanbase</groupId>
             <artifactId>oceanbase-client</artifactId>
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
index eeeff227b2..2362f827b6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
@@ -47,6 +47,14 @@ public class JdbcConnectionConfig implements Serializable {
 
     public int transactionTimeoutSec = 
JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
 
+    public boolean useKerberos = JdbcOptions.USE_KERBEROS.defaultValue();
+
+    public String kerberosPrincipal;
+
+    public String kerberosKeytabPath;
+
+    public String krb5Path = JdbcOptions.KRB5_PATH.defaultValue();
+
     private Map<String, String> properties;
 
     public static JdbcConnectionConfig of(ReadonlyConfig config) {
@@ -64,7 +72,12 @@ public class JdbcConnectionConfig implements Serializable {
             
builder.transactionTimeoutSec(config.get(JdbcOptions.TRANSACTION_TIMEOUT_SEC));
             builder.maxRetries(0);
         }
-
+        if (config.get(JdbcOptions.USE_KERBEROS)) {
+            builder.useKerberos(config.get(JdbcOptions.USE_KERBEROS));
+            
builder.kerberosPrincipal(config.get(JdbcOptions.KERBEROS_PRINCIPAL));
+            
builder.kerberosKeytabPath(config.get(JdbcOptions.KERBEROS_KEYTAB_PATH));
+            builder.kerberosKeytabPath(config.get(JdbcOptions.KRB5_PATH));
+        }
         config.getOptional(JdbcOptions.USER).ifPresent(builder::username);
         config.getOptional(JdbcOptions.PASSWORD).ifPresent(builder::password);
         
config.getOptional(JdbcOptions.PROPERTIES).ifPresent(builder::properties);
@@ -143,6 +156,10 @@ public class JdbcConnectionConfig implements Serializable {
         private int maxCommitAttempts = 
JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
         private int transactionTimeoutSec = 
JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
         private Map<String, String> properties;
+        public boolean useKerberos = JdbcOptions.USE_KERBEROS.defaultValue();
+        public String kerberosPrincipal;
+        public String kerberosKeytabPath;
+        public String krb5Path = JdbcOptions.KRB5_PATH.defaultValue();
 
         private Builder() {}
 
@@ -211,6 +228,26 @@ public class JdbcConnectionConfig implements Serializable {
             return this;
         }
 
+        public Builder useKerberos(boolean useKerberos) {
+            this.useKerberos = useKerberos;
+            return this;
+        }
+
+        public Builder kerberosPrincipal(String kerberosPrincipal) {
+            this.kerberosPrincipal = kerberosPrincipal;
+            return this;
+        }
+
+        public Builder kerberosKeytabPath(String kerberosKeytabPath) {
+            this.kerberosKeytabPath = kerberosKeytabPath;
+            return this;
+        }
+
+        public Builder krb5Path(String krb5Path) {
+            this.krb5Path = krb5Path;
+            return this;
+        }
+
         public Builder properties(Map<String, String> properties) {
             this.properties = properties;
             return this;
@@ -230,6 +267,10 @@ public class JdbcConnectionConfig implements Serializable {
             jdbcConnectionConfig.transactionTimeoutSec = 
this.transactionTimeoutSec;
             jdbcConnectionConfig.maxCommitAttempts = this.maxCommitAttempts;
             jdbcConnectionConfig.xaDataSourceClassName = 
this.xaDataSourceClassName;
+            jdbcConnectionConfig.useKerberos = this.useKerberos;
+            jdbcConnectionConfig.kerberosPrincipal = this.kerberosPrincipal;
+            jdbcConnectionConfig.kerberosKeytabPath = this.kerberosKeytabPath;
+            jdbcConnectionConfig.krb5Path = this.krb5Path;
             jdbcConnectionConfig.properties =
                     this.properties == null ? new HashMap<>() : 
this.properties;
             return jdbcConnectionConfig;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index 8f57837078..0493eb8e4f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -180,6 +180,33 @@ public interface JdbcOptions {
                     .noDefaultValue()
                     .withDescription("Whether case conversion is required");
 
+    Option<Boolean> USE_KERBEROS =
+            Options.key("use_kerberos")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to enable Kerberos, default is 
false.");
+
+    Option<String> KERBEROS_PRINCIPAL =
+            Options.key("kerberos_principal")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "When use kerberos, we should set kerberos 
principal such as 'test_user@xxx'. ");
+
+    Option<String> KERBEROS_KEYTAB_PATH =
+            Options.key("kerberos_keytab_path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "When use kerberos, we should set kerberos 
principal file path such as '/home/test/test_user.keytab'. ");
+
+    Option<String> KRB5_PATH =
+            Options.key("krb5_path")
+                    .stringType()
+                    .defaultValue("/etc/krb5.conf")
+                    .withDescription(
+                            "When use kerberos, we should set krb5 path file 
path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");
+
     Option<Map<String, String>> PROPERTIES =
             Options.key("properties")
                     .mapType()
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
index 3d53b102bd..22438de84c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
@@ -27,8 +27,9 @@ public enum JdbcConnectorErrorCode implements 
SeaTunnelErrorCode {
     CONNECT_DATABASE_FAILED("JDBC-04", "Connector database failed"),
     TRANSACTION_OPERATION_FAILED(
             "JDBC-05", "transaction operation failed, such as (commit, 
rollback) etc.."),
-    NO_SUITABLE_DIALECT_FACTORY("JDBC-06", "No suitable dialect factory 
found");
-
+    NO_SUITABLE_DIALECT_FACTORY("JDBC-06", "No suitable dialect factory 
found"),
+    DONT_SUPPORT_SINK("JDBC-07", "The jdbc type don't support sink"),
+    KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication 
failed");
     private final String code;
 
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
index 40b75ced6a..af329a99d4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
@@ -43,7 +43,7 @@ public class SimpleJdbcConnectionProvider implements 
JdbcConnectionProvider, Ser
 
     private static final long serialVersionUID = 1L;
 
-    private final JdbcConnectionConfig jdbcConfig;
+    protected final JdbcConnectionConfig jdbcConfig;
 
     private transient Driver loadedDriver;
     protected transient Connection connection;
@@ -88,7 +88,7 @@ public class SimpleJdbcConnectionProvider implements 
JdbcConnectionProvider, Ser
         }
     }
 
-    private Driver getLoadedDriver() throws SQLException, 
ClassNotFoundException {
+    protected Driver getLoadedDriver() throws SQLException, 
ClassNotFoundException {
         if (loadedDriver == null) {
             loadedDriver = loadDriver(jdbcConfig.getDriverName());
         }
@@ -141,4 +141,12 @@ public class SimpleJdbcConnectionProvider implements 
JdbcConnectionProvider, Ser
         closeConnection();
         return getOrEstablishConnection();
     }
+
+    public JdbcConnectionConfig getJdbcConfig() {
+        return jdbcConfig;
+    }
+
+    public void setConnection(Connection connection) {
+        this.connection = connection;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 0f554df3e8..aa7c03ccdb 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -18,6 +18,9 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
 import org.apache.seatunnel.api.table.catalog.TablePath;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
@@ -391,4 +394,9 @@ public interface JdbcDialect extends Serializable {
             }
         }
     }
+
+    default JdbcConnectionProvider getJdbcConnectionProvider(
+            JdbcConnectionConfig jdbcConnectionConfig) {
+        return new SimpleJdbcConnectionProvider(jdbcConnectionConfig);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
new file mode 100644
index 0000000000..08e68632f7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.sql.Connection;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Optional;
+
+public class HiveDialect implements JdbcDialect {
+
+    @Override
+    public String dialectName() {
+        return DatabaseIdentifier.HIVE;
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new HiveJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new HiveTypeMapper();
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
+        return Optional.empty();
+    }
+
+    @Override
+    public ResultSetMetaData getResultSetMetaData(Connection conn, String 
query)
+            throws SQLException {
+        return conn.prepareStatement(query).executeQuery().getMetaData();
+    }
+
+    @Override
+    public JdbcConnectionProvider getJdbcConnectionProvider(
+            JdbcConnectionConfig jdbcConnectionConfig) {
+        return new HiveJdbcConnectionProvider(jdbcConnectionConfig);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java
new file mode 100644
index 0000000000..56bd81b7f8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/** Factory for {@link HiveDialect}. */
+@AutoService(JdbcDialectFactory.class)
+public class HiveDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:hive2:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new HiveDialect();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
new file mode 100644
index 0000000000..1a45a8600a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode.KERBEROS_AUTHENTICATION_FAILED;
+
+public class HiveJdbcConnectionProvider extends SimpleJdbcConnectionProvider {
+
+    public HiveJdbcConnectionProvider(@NonNull JdbcConnectionConfig 
jdbcConfig) {
+        super(jdbcConfig);
+    }
+
+    @Override
+    public Connection getOrEstablishConnection() throws SQLException, 
ClassNotFoundException {
+        if (isConnectionValid()) {
+            return super.getConnection();
+        }
+        JdbcConnectionConfig jdbcConfig = super.getJdbcConfig();
+        if (jdbcConfig.useKerberos) {
+            System.setProperty("java.security.krb5.conf", jdbcConfig.krb5Path);
+            Configuration configuration = new Configuration();
+            configuration.set("hadoop.security.authentication", "kerberos");
+            UserGroupInformation.setConfiguration(configuration);
+            try {
+                UserGroupInformation.loginUserFromKeytab(
+                        jdbcConfig.kerberosPrincipal, 
jdbcConfig.kerberosKeytabPath);
+            } catch (IOException e) {
+                throw new 
JdbcConnectorException(KERBEROS_AUTHENTICATION_FAILED, e);
+            }
+        }
+        Driver driver = getLoadedDriver();
+        Properties info = new Properties();
+        if (super.getJdbcConfig().getUsername().isPresent()) {
+            info.setProperty("user", 
super.getJdbcConfig().getUsername().get());
+        }
+        if (super.getJdbcConfig().getPassword().isPresent()) {
+            info.setProperty("password", 
super.getJdbcConfig().getPassword().get());
+        }
+        super.setConnection(driver.connect(super.getJdbcConfig().getUrl(), 
info));
+        if (super.getConnection() == null) {
+            // Throw same exception as DriverManager.getConnection when no 
driver found to match
+            // caller expectation.
+            throw new JdbcConnectorException(
+                    JdbcConnectorErrorCode.NO_SUITABLE_DRIVER,
+                    "No suitable driver found for " + 
super.getJdbcConfig().getUrl());
+        }
+        return super.getConnection();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
new file mode 100644
index 0000000000..91ed90105c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import java.sql.PreparedStatement;
+
+public class HiveJdbcRowConverter extends AbstractJdbcRowConverter {
+
+    @Override
+    public String converterName() {
+        return DatabaseIdentifier.HIVE;
+    }
+
+    @Override
+    public PreparedStatement toExternal(
+            SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement 
statement) {
+        throw new JdbcConnectorException(
+                JdbcConnectorErrorCode.DONT_SUPPORT_SINK,
+                "The Hive jdbc connector don't support sink");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveTypeMapper.java
new file mode 100644
index 0000000000..0940b5a2c6
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveTypeMapper.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class HiveTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HiveTypeMapper.class);
+
+    // reference 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types
+
+    // Numeric Types
+    private static final String HIVE_TINYINT = "TINYINT";
+    private static final String HIVE_SMALLINT = "SMALLINT";
+    private static final String HIVE_INT = "INT";
+    private static final String HIVE_INTEGER = "INTEGER";
+    private static final String HIVE_BIGINT = "BIGINT";
+    private static final String HIVE_FLOAT = "FLOAT";
+    private static final String HIVE_DOUBLE = "DOUBLE";
+    private static final String HIVE_DOUBLE_PRECISION = "DOUBLE PRECISION";
+    private static final String HIVE_DECIMAL = "DECIMAL";
+    private static final String HIVE_NUMERIC = "NUMERIC";
+    // Date/Time Types
+    private static final String HIVE_TIMESTAMP = "TIMESTAMP";
+    private static final String HIVE_DATE = "DATE";
+    private static final String HIVE_INTERVAL = "INTERVAL";
+    // String Types
+    private static final String HIVE_STRING = "STRING";
+    private static final String HIVE_VARCHAR = "VARCHAR";
+    private static final String HIVE_CHAR = "CHAR";
+    // Misc Types
+    private static final String HIVE_BOOLEAN = "BOOLEAN";
+    private static final String HIVE_BINARY = "BINARY";
+    // Complex Types
+    private static final String HIVE_ARRAY = "ARRAY";
+    private static final String HIVE_MAP = "MAP";
+    private static final String HIVE_STRUCT = "STRUCT";
+    private static final String HIVE_UNIONTYPE = "UNIONTYPE";
+
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int 
colIndex)
+            throws SQLException {
+        String columnType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        int precision = metadata.getPrecision(colIndex);
+        switch (columnType) {
+            case HIVE_TINYINT:
+                return BasicType.BYTE_TYPE;
+            case HIVE_SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case HIVE_INT:
+            case HIVE_INTEGER:
+                return BasicType.INT_TYPE;
+            case HIVE_BIGINT:
+                return BasicType.LONG_TYPE;
+            case HIVE_FLOAT:
+                return BasicType.FLOAT_TYPE;
+            case HIVE_DOUBLE:
+            case HIVE_DOUBLE_PRECISION:
+                return BasicType.DOUBLE_TYPE;
+            case HIVE_DECIMAL:
+            case HIVE_NUMERIC:
+                if (precision > 0) {
+                    return new DecimalType(precision, 
metadata.getScale(colIndex));
+                }
+                LOG.warn("decimal did define precision,scale, will be 
Decimal(38,18)");
+                return new DecimalType(38, 18);
+            case HIVE_TIMESTAMP:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case HIVE_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case HIVE_STRING:
+            case HIVE_VARCHAR:
+            case HIVE_CHAR:
+                return BasicType.STRING_TYPE;
+            case HIVE_BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case HIVE_BINARY:
+            case HIVE_ARRAY:
+            case HIVE_INTERVAL:
+            case HIVE_MAP:
+            case HIVE_STRUCT:
+            case HIVE_UNIONTYPE:
+            default:
+                final String jdbcColumnName = metadata.getColumnName(colIndex);
+                throw new JdbcConnectorException(
+                        CommonErrorCode.UNSUPPORTED_OPERATION,
+                        String.format(
+                                "Doesn't support hive type '%s' on column '%s' 
 yet.",
+                                columnType, jdbcColumnName));
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 376515d91e..5b6682cf8e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -30,7 +30,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionPoolProviderProxy;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
@@ -68,7 +67,7 @@ public class JdbcSinkWriter
         this.rowType = rowType;
         this.primaryKeyIndex = primaryKeyIndex;
         this.connectionProvider =
-                new 
SimpleJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
+                
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
         this.outputFormat =
                 new JdbcOutputFormatBuilder(dialect, connectionProvider, 
jdbcSinkConfig, rowType)
                         .build();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index 720bbfdbeb..355e95cc81 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -28,7 +28,6 @@ import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
 
@@ -64,11 +63,11 @@ public abstract class ChunkSplitter implements 
AutoCloseable, Serializable {
         this.config = config;
         this.autoCommit = config.getJdbcConnectionConfig().isAutoCommit();
         this.fetchSize = config.getFetchSize();
-        this.connectionProvider =
-                new 
SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig());
         this.jdbcDialect =
                 JdbcDialectLoader.load(
                         config.getJdbcConnectionConfig().getUrl(), 
config.getCompatibleMode());
+        this.connectionProvider =
+                
jdbcDialect.getJdbcConnectionProvider(config.getJdbcConnectionConfig());
     }
 
     public static ChunkSplitter create(JdbcSourceConfig config) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/HiveJdbcUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/HiveJdbcUtils.java
new file mode 100644
index 0000000000..0b503b3084
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/HiveJdbcUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode.KERBEROS_AUTHENTICATION_FAILED;
+
+@Slf4j
+public class HiveJdbcUtils {
+
+    public static synchronized void 
doKerberosAuthentication(JdbcConnectionConfig jdbcConfig) {
+        String principal = jdbcConfig.kerberosPrincipal;
+        String keytabPath = jdbcConfig.kerberosKeytabPath;
+        String krb5Path = jdbcConfig.krb5Path;
+        System.setProperty("java.security.krb5.conf", krb5Path);
+        Configuration configuration = new Configuration();
+
+        if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath)) 
{
+            log.warn(
+                    "Principal [{}] or keytabPath [{}] is empty, it will skip 
kerberos authentication",
+                    principal,
+                    keytabPath);
+        } else {
+            configuration.set("hadoop.security.authentication", "kerberos");
+            UserGroupInformation.setConfiguration(configuration);
+            try {
+                log.info(
+                        "Start Kerberos authentication using principal {} and 
keytab {}",
+                        principal,
+                        keytabPath);
+                UserGroupInformation.loginUserFromKeytab(principal, 
keytabPath);
+                log.info("Kerberos authentication successful");
+            } catch (IOException e) {
+                String errorMsg =
+                        String.format(
+                                "Kerberos authentication failed using this "
+                                        + "principal [%s] and keytab path 
[%s]",
+                                principal, keytabPath);
+                throw new 
JdbcConnectorException(KERBEROS_AUTHENTICATION_FAILED, errorMsg, e);
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index a086326764..306a0552cf 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -32,7 +32,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceTableConfig;
-import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
@@ -101,7 +101,7 @@ public class JdbcCatalogUtils {
         log.warn(
                 "Catalog not found, loading tables from jdbc directly. url : 
{}",
                 jdbcConnectionConfig.getUrl());
-        try (Connection connection = getConnection(jdbcConnectionConfig)) {
+        try (Connection connection = getConnection(jdbcConnectionConfig, 
jdbcDialect)) {
             log.info("Loading catalog tables for jdbc : {}", 
jdbcConnectionConfig.getUrl());
             for (JdbcSourceTableConfig tableConfig : tablesConfig) {
                 CatalogTable catalogTable = getCatalogTable(tableConfig, 
connection, jdbcDialect);
@@ -316,9 +316,9 @@ public class JdbcCatalogUtils {
                 resultSetMetaData, jdbcDialect.getJdbcDialectTypeMapper());
     }
 
-    private static Connection getConnection(JdbcConnectionConfig config)
+    private static Connection getConnection(JdbcConnectionConfig config, 
JdbcDialect jdbcDialect)
             throws SQLException, ClassNotFoundException {
-        SimpleJdbcConnectionProvider connectionProvider = new 
SimpleJdbcConnectionProvider(config);
+        JdbcConnectionProvider connectionProvider = 
jdbcDialect.getJdbcConnectionProvider(config);
         return connectionProvider.getOrEstablishConnection();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
index 8628e2b80b..1ec13aad68 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
@@ -91,6 +91,11 @@
             <artifactId>vertica-jdbc</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-jdbc</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
new file mode 100644
index 0000000000..69542598d4
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class JdbcHiveIT extends AbstractJdbcIT {
+
+    private static final String HIVE_IMAGE = "apache/hive:3.1.3";
+    private static final String HIVE_CONTAINER_HOST = "e2ehivejdbc";
+
+    private static final String HIVE_DATABASE = "default";
+
+    private static final String HIVE_SOURCE = "e2e_table_source";
+    private static final String HIVE_USERNAME = "root";
+    private static final String HIVE_PASSWORD = null;
+    private static final int HIVE_PORT = 10000;
+    private static final String HIVE_URL = "jdbc:hive2://" + HOST + ":%s/%s";
+
+    private static final String DRIVER_CLASS = 
"org.apache.hive.jdbc.HiveDriver";
+
+    private static final List<String> CONFIG_FILE =
+            Lists.newArrayList("/jdbc_hive_source_and_assert.conf");
+    private static final String CREATE_SQL =
+            "CREATE TABLE hive_e2e_source_table"
+                    + "("
+                    + "    int_column              INT,"
+                    + "    integer_column          INTEGER,"
+                    + "    bigint_column           BIGINT,"
+                    + "    smallint_column         SMALLINT,"
+                    + "    tinyint_column          TINYINT,"
+                    + "    double_column           DOUBLE,"
+                    + "    double_PRECISION_column DOUBLE PRECISION,"
+                    + "    float_column            FLOAT,"
+                    + "    string_column           STRING,"
+                    + "    char_column             CHAR(10),"
+                    + "    varchar_column          VARCHAR(20),"
+                    + "    boolean_column          BOOLEAN,"
+                    + "    date_column             DATE,"
+                    + "    timestamp_column        TIMESTAMP,"
+                    + "    decimal_column          DECIMAL(10, 2),"
+                    + "    numeric_column          NUMERIC(10, 2)"
+                    + ")";
+
+    @Override
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl = String.format(HIVE_URL, HIVE_PORT, HIVE_DATABASE);
+        return JdbcCase.builder()
+                .dockerImage(HIVE_IMAGE)
+                .networkAliases(HIVE_CONTAINER_HOST)
+                .containerEnv(containerEnv)
+                .driverClass(DRIVER_CLASS)
+                .host(HOST)
+                .port(HIVE_PORT)
+                .localPort(HIVE_PORT)
+                .jdbcTemplate(HIVE_URL)
+                .jdbcUrl(jdbcUrl)
+                .userName(HIVE_USERNAME)
+                .password(HIVE_PASSWORD)
+                .database(HIVE_DATABASE)
+                .sourceTable(HIVE_SOURCE)
+                .createSql(CREATE_SQL)
+                .configFile(CONFIG_FILE)
+                .build();
+    }
+
+    protected void createNeededTables() {
+        try (Statement statement = connection.createStatement()) {
+            String createTemplate = jdbcCase.getCreateSql();
+            String createSource =
+                    String.format(
+                            createTemplate,
+                            buildTableInfoWithSchema(
+                                    jdbcCase.getDatabase(), 
jdbcCase.getSourceTable()));
+            statement.execute(createSource);
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
+        }
+    }
+
+    protected void insertTestData() {
+        try (Statement statement = connection.createStatement()) {
+            for (int i = 1; i <= 3; i++) {
+                statement.execute(
+                        "INSERT INTO hive_e2e_source_table "
+                                + "VALUES (2,"
+                                + "        1,"
+                                + "        1234567890,"
+                                + "        32767,"
+                                + "        127,"
+                                + "        123.45,"
+                                + "        123.45,"
+                                + "        67.89,"
+                                + "        'Hello, Hive',"
+                                + "        'CharCol',"
+                                + "        'VarcharCol',"
+                                + "        TRUE,"
+                                + "        '2023-09-04',"
+                                + "        '2023-09-04 10:30:00',"
+                                + "        42.12,"
+                                + "        42.12)");
+            }
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.INSERT_DATA_FAILED, exception);
+        }
+    }
+
+    @Override
+    void compareResult() {}
+
+    @Override
+    String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar";;
+    }
+
+    @Override
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        return null;
+    }
+
+    @Override
+    GenericContainer<?> initContainer() {
+        GenericContainer<?> container =
+                new GenericContainer<>(HIVE_IMAGE)
+                        .withExposedPorts(HIVE_PORT)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(HIVE_CONTAINER_HOST)
+                        .withEnv("SERVICE_NAME", "hiveserver2")
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(HIVE_IMAGE)));
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
HIVE_PORT, HIVE_PORT)));
+        return container;
+    }
+
+    public void clearTable(String schema, String table) {
+        // do nothing.
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
new file mode 100644
index 0000000000..04b0240a3b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 5000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    Jdbc {
+        url = "jdbc:hive2://e2ehivejdbc:10000/default"
+        user = "root"
+        driver = "org.apache.hive.jdbc.HiveDriver"
+        query = "select * from hive_e2e_source_table"
+        auto_commit= false
+    }
+}
+
+transform {
+}
+
+sink{
+  assert {
+    rules =
+      {
+        row_rules = [
+          {
+            rule_type = MAX_ROW
+            rule_value = 3
+          },
+          {
+            rule_type = MIN_ROW
+            rule_value = 3
+          }
+        ],
+        field_rules = [
+        {
+          field_name = hive_e2e_source_table.int_column
+          field_type = int
+          field_value = [{equals_to = 2}]
+        },
+        {
+          field_name = hive_e2e_source_table.integer_column
+          field_type = int
+          field_value = [{equals_to = 1}]
+        },
+        {
+          field_name = hive_e2e_source_table.bigint_column
+          field_type = long
+          field_value = [{equals_to = 1234567890}]
+        },
+        {
+          field_name = hive_e2e_source_table.smallint_column
+          field_type = short
+          field_value = [{equals_to = 32767}]
+         },
+        {
+          field_name = hive_e2e_source_table.tinyint_column
+          field_type = byte
+          field_value = [{equals_to = 127}]
+         },
+        {
+          field_name = hive_e2e_source_table.double_column
+          field_type = double
+          field_value = [{equals_to = 123.45}]
+          },
+        {
+          field_name = hive_e2e_source_table.double_precision_column
+          field_type = double
+          field_value = [{equals_to = 123.45}]
+          },
+        {
+          field_name = hive_e2e_source_table.float_column
+          field_type = float
+          field_value = [{equals_to = 67.89}]
+          },
+        {
+          field_name = hive_e2e_source_table.string_column
+          field_type = string
+          field_value = [{equals_to = "Hello, Hive"}]
+          },
+        {
+          field_name = hive_e2e_source_table.char_column
+          field_type = string
+          field_value = [{equals_to = "CharCol   "}]
+          },
+        {
+          field_name = hive_e2e_source_table.varchar_column
+          field_type = string
+          field_value = [{equals_to = "VarcharCol"}]
+          },
+        {
+          field_name = hive_e2e_source_table.boolean_column
+          field_type = boolean
+          field_value = [{equals_to = "TRUE"}]
+          },
+        {
+          field_name = hive_e2e_source_table.date_column
+          field_type = date
+          field_value = [{equals_to = "2023-09-04"}]
+          },
+        {
+          field_name = hive_e2e_source_table.timestamp_column
+          field_type = timestamp
+          field_value = [{equals_to = "2023-09-04T10:30:00"}]
+          },
+        {
+          field_name = hive_e2e_source_table.decimal_column
+          field_type = decimal
+          field_value = [{equals_to = 42.12}]
+          },
+        {
+         field_name = hive_e2e_source_table.numeric_column
+         field_type = decimal
+         field_value = [{equals_to = 42.12}]
+         },
+        ]
+      }
+  }
+}

Reply via email to