This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a64e177d06 [Feature][Hive JDBC Source] Support Hive JDBC Source
Connector (#5424)
a64e177d06 is described below
commit a64e177d063f94842c54b563b64e3509663e7bf1
Author: Nick <[email protected]>
AuthorDate: Tue Nov 7 13:42:48 2023 +0800
[Feature][Hive JDBC Source] Support Hive JDBC Source Connector (#5424)
---
.../connector-v2/Error-Quick-Reference-Manual.md | 2 +
docs/en/connector-v2/source/Hive-jdbc.md | 164 +++++++++++++++++++
seatunnel-connectors-v2/connector-jdbc/pom.xml | 17 ++
.../jdbc/config/JdbcConnectionConfig.java | 43 ++++-
.../seatunnel/jdbc/config/JdbcOptions.java | 27 ++++
.../jdbc/exception/JdbcConnectorErrorCode.java | 5 +-
.../connection/SimpleJdbcConnectionProvider.java | 12 +-
.../jdbc/internal/dialect/JdbcDialect.java | 8 +
.../jdbc/internal/dialect/hive/HiveDialect.java | 66 ++++++++
.../internal/dialect/hive/HiveDialectFactory.java | 38 +++++
.../dialect/hive/HiveJdbcConnectionProvider.java | 79 ++++++++++
.../dialect/hive/HiveJdbcRowConverter.java | 43 +++++
.../jdbc/internal/dialect/hive/HiveTypeMapper.java | 120 ++++++++++++++
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 3 +-
.../seatunnel/jdbc/source/ChunkSplitter.java | 5 +-
.../seatunnel/jdbc/utils/HiveJdbcUtils.java | 68 ++++++++
.../seatunnel/jdbc/utils/JdbcCatalogUtils.java | 8 +-
.../connector-jdbc-e2e-part-3/pom.xml | 5 +
.../connectors/seatunnel/jdbc/JdbcHiveIT.java | 173 +++++++++++++++++++++
.../resources/jdbc_hive_source_and_assert.conf | 141 +++++++++++++++++
20 files changed, 1013 insertions(+), 14 deletions(-)
diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 03c9412424..b7e59b74cf 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -194,6 +194,8 @@ problems encountered by users.
| JDBC-04 | Connector database failed |
When users encounter this error code, it means that database connection
failure, check whether the url is correct or whether the corresponding service
is normal |
| JDBC-05 | transaction operation failed, such as (commit, rollback) etc.. |
When users encounter this error code, it means that if a sql transaction fails,
check the transaction execution of the corresponding database to determine the
cause of the transaction failure |
| JDBC-06 | No suitable dialect factory found |
When users encounter this error code, it means that may be an unsupported
dialect type
|
+| JDBC-07 | The jdbc type don't support sink |
When users encounter this error code, it means that jdbc type don't support
sink
|
+| JDBC-08 | Kerberos authentication failed |
When users encounter this error code, it means that database connection
Kerberos authentication failed
|
## Pulsar Connector Error Codes
diff --git a/docs/en/connector-v2/source/Hive-jdbc.md
b/docs/en/connector-v2/source/Hive-jdbc.md
new file mode 100644
index 0000000000..9b4c4e1102
--- /dev/null
+++ b/docs/en/connector-v2/source/Hive-jdbc.md
@@ -0,0 +1,164 @@
+# Hive
+
+> JDBC Hive Source Connector
+
+## Support Hive Version
+
+- Definitely supports 3.1.3 and 3.1.2, other versions need to be tested.
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+> supports query SQL and can achieve projection effect.
+
+## Description
+
+Read external data source data through JDBC.
+
+## Supported DataSource Info
+
+| Datasource | Supported versions |
Driver | Url |
Maven |
+|------------|----------------------------------------------------------|---------------------------------|--------------------------------------|--------------------------------------------------------------------------|
+| Hive | Different dependency version has different driver class. |
org.apache.hive.jdbc.HiveDriver | jdbc:hive2://localhost:10000/default |
[Download](https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc) |
+
+## Database Dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the
'$SEATNUNNEL_HOME/plugins/jdbc/lib/'
+> working directory<br/>
+> For example Hive datasource: cp hive-jdbc-xxx.jar
$SEATNUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+| Hive Data type
| SeaTunnel Data type |
+|-------------------------------------------------------------------------------------------|---------------------|
+| BOOLEAN
| BOOLEAN |
+| TINYINT<br/> SMALLINT
| SHORT |
+| INT<br/>INTEGER
| INT |
+| BIGINT
| LONG |
+| FLOAT
| FLOAT |
+| DOUBLE<br/>DOUBLE PRECISION
| DOUBLE |
+| DECIMAL(x,y)<br/>NUMERIC(x,y)<br/>(Get the designated column's specified
column size.<38) | DECIMAL(x,y) |
+| DECIMAL(x,y)<br/>NUMERIC(x,y)<br/>(Get the designated column's specified
column size.>38) | DECIMAL(38,18) |
+| CHAR<br/>VARCHAR<br/>STRING
| STRING |
+| DATE
| DATE |
+| DATETIME<br/>TIMESTAMP
| TIMESTAMP |
+| BINARY<br/> ARRAY <br/>INTERVAL <br/>MAP <br/>STRUCT<br/>UNIONTYPE
| Not supported yet |
+
+## Source Options
+
+| Name | Type | Required | Default |
Description
|
+|------------------------------|------------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url | String | Yes | - | The
URL of the JDBC connection. Refer to a case:
jdbc:hive2://localhost:10000/default
|
+| driver | String | Yes | - | The
jdbc class name used to connect to the remote data source,<br/> if you use Hive
the value is `org.apache.hive.jdbc.HiveDriver`.
|
+| user | String | No | - |
Connection instance user name
|
+| password | String | No | - |
Connection instance password
|
+| query | String | Yes | - |
Query statement
|
+| connection_check_timeout_sec | Int | No | 30 | The
time in seconds to wait for the database operation used to validate the
connection to complete
|
+| partition_column | String | No | - | The
column name for parallelism's partition, only support numeric type,Only support
numeric type primary key, and only can config one column.
|
+| partition_lower_bound | BigDecimal | No | - | The
partition_column min value for scan, if not set SeaTunnel will query database
get min value.
|
+| partition_upper_bound | BigDecimal | No | - | The
partition_column max value for scan, if not set SeaTunnel will query database
get max value.
|
+| partition_num | Int | No | job parallelism | The
number of partition count, only support positive integer. default value is job
parallelism
|
+| fetch_size | Int | No | 0 | For
queries that return a large number of objects,you can configure<br/> the row
fetch size used in the query toimprove performance by<br/> reducing the number
database hits required to satisfy the selection criteria.<br/> Zero means use
jdbc default value. |
+| common-options | | No | - |
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
|
+| useKerberos | Boolean | No | no |
Whether to enable Kerberos, default is false
|
+| kerberos_principal | String | No | - |
When use kerberos, we should set kerberos principal such as 'test_user@xxx'.
|
+| kerberos_keytab_path | String | No | - |
When use kerberos, we should set kerberos principal file path such as
'/home/test/test_user.keytab' .
|
+| krb5_path | String | No | /etc/krb5.conf |
When use kerberos, we should set krb5 path file path such as
'/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf '.
|
+
+### Tips
+
+> If partition_column is not set, it will run in single concurrency, and if
partition_column is set, it will be executed
+> in parallel according to the concurrency of tasks , When your shard read
field is a large number type such as bigint(
+> and above and the data is not evenly distributed, it is recommended to set
the parallelism level to 1 to ensure that
+> the
+> data skew problem is resolved
+
+## Task Example
+
+### Simple:
+
+> This example queries type_bin 'table' 16 data in your test "database" in
single parallel and queries all of its
+> fields. You can also specify which fields to query for final output to the
console.
+
+```
+# Defining the runtime environment
+env {
+ # You can set flink configuration here
+ execution.parallelism = 2
+ job.mode = "BATCH"
+}
+source{
+ Jdbc {
+ url = "jdbc:hive2://localhost:10000/default"
+ driver = "org.apache.hive.jdbc.HiveDriver"
+ connection_check_timeout_sec = 100
+ query = "select * from type_bin limit 16"
+ }
+}
+
+transform {
+ # If you would like to get more information about how to configure
seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform-v2/sql
+}
+
+sink {
+ Console {}
+}
+```
+
+### Parallel:
+
+> Read your query table in parallel with the shard field you configured and
the shard data You can do this if you want
+> to read the whole table
+
+```
+source {
+ Jdbc {
+ url = "jdbc:hive2://localhost:10000/default"
+ driver = "org.apache.hive.jdbc.HiveDriver"
+ connection_check_timeout_sec = 100
+ # Define query logic as required
+ query = "select * from type_bin"
+ # Parallel sharding reads fields
+ partition_column = "id"
+ # Number of fragments
+ partition_num = 10
+ }
+}
+```
+
+### Parallel Boundary:
+
+> It is more efficient to specify the data within the upper and lower bounds
of the query It is more efficient to read
+> your data source according to the upper and lower boundaries you configured
+
+```
+source {
+ Jdbc {
+ url = "jdbc:hive2://localhost:10000/default"
+ driver = "org.apache.hive.jdbc.HiveDriver"
+ connection_check_timeout_sec = 100
+ # Define query logic as required
+ query = "select * from type_bin"
+ partition_column = "id"
+ # Read start boundary
+ partition_lower_bound = 1
+ # Read end boundary
+ partition_upper_bound = 500
+ partition_num = 10
+ }
+}
+```
+
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 2d081fa583..6c64c9c667 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -48,6 +48,7 @@
<hikari.version>4.0.3</hikari.version>
<postgis.jdbc.version>2.5.1</postgis.jdbc.version>
<kingbase8.version>8.6.0</kingbase8.version>
+ <hive.jdbc.version>3.1.3</hive.jdbc.version>
<oceanbase.jdbc.version>2.4.3</oceanbase.jdbc.version>
</properties>
@@ -169,6 +170,18 @@
<version>${kingbase8.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <version>${hive.jdbc.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>jdk.tools</groupId>
+ <artifactId>jdk.tools</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
@@ -269,6 +282,10 @@
<groupId>cn.com.kingbase</groupId>
<artifactId>kingbase8</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ </dependency>
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
index eeeff227b2..2362f827b6 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
@@ -47,6 +47,14 @@ public class JdbcConnectionConfig implements Serializable {
public int transactionTimeoutSec =
JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
+ public boolean useKerberos = JdbcOptions.USE_KERBEROS.defaultValue();
+
+ public String kerberosPrincipal;
+
+ public String kerberosKeytabPath;
+
+ public String krb5Path = JdbcOptions.KRB5_PATH.defaultValue();
+
private Map<String, String> properties;
public static JdbcConnectionConfig of(ReadonlyConfig config) {
@@ -64,7 +72,12 @@ public class JdbcConnectionConfig implements Serializable {
builder.transactionTimeoutSec(config.get(JdbcOptions.TRANSACTION_TIMEOUT_SEC));
builder.maxRetries(0);
}
-
+ if (config.get(JdbcOptions.USE_KERBEROS)) {
+ builder.useKerberos(config.get(JdbcOptions.USE_KERBEROS));
+
builder.kerberosPrincipal(config.get(JdbcOptions.KERBEROS_PRINCIPAL));
+
builder.kerberosKeytabPath(config.get(JdbcOptions.KERBEROS_KEYTAB_PATH));
+ builder.kerberosKeytabPath(config.get(JdbcOptions.KRB5_PATH));
+ }
config.getOptional(JdbcOptions.USER).ifPresent(builder::username);
config.getOptional(JdbcOptions.PASSWORD).ifPresent(builder::password);
config.getOptional(JdbcOptions.PROPERTIES).ifPresent(builder::properties);
@@ -143,6 +156,10 @@ public class JdbcConnectionConfig implements Serializable {
private int maxCommitAttempts =
JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
private int transactionTimeoutSec =
JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
private Map<String, String> properties;
+ public boolean useKerberos = JdbcOptions.USE_KERBEROS.defaultValue();
+ public String kerberosPrincipal;
+ public String kerberosKeytabPath;
+ public String krb5Path = JdbcOptions.KRB5_PATH.defaultValue();
private Builder() {}
@@ -211,6 +228,26 @@ public class JdbcConnectionConfig implements Serializable {
return this;
}
+ public Builder useKerberos(boolean useKerberos) {
+ this.useKerberos = useKerberos;
+ return this;
+ }
+
+ public Builder kerberosPrincipal(String kerberosPrincipal) {
+ this.kerberosPrincipal = kerberosPrincipal;
+ return this;
+ }
+
+ public Builder kerberosKeytabPath(String kerberosKeytabPath) {
+ this.kerberosKeytabPath = kerberosKeytabPath;
+ return this;
+ }
+
+ public Builder krb5Path(String krb5Path) {
+ this.krb5Path = krb5Path;
+ return this;
+ }
+
public Builder properties(Map<String, String> properties) {
this.properties = properties;
return this;
@@ -230,6 +267,10 @@ public class JdbcConnectionConfig implements Serializable {
jdbcConnectionConfig.transactionTimeoutSec =
this.transactionTimeoutSec;
jdbcConnectionConfig.maxCommitAttempts = this.maxCommitAttempts;
jdbcConnectionConfig.xaDataSourceClassName =
this.xaDataSourceClassName;
+ jdbcConnectionConfig.useKerberos = this.useKerberos;
+ jdbcConnectionConfig.kerberosPrincipal = this.kerberosPrincipal;
+ jdbcConnectionConfig.kerberosKeytabPath = this.kerberosKeytabPath;
+ jdbcConnectionConfig.krb5Path = this.krb5Path;
jdbcConnectionConfig.properties =
this.properties == null ? new HashMap<>() :
this.properties;
return jdbcConnectionConfig;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index 8f57837078..0493eb8e4f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -180,6 +180,33 @@ public interface JdbcOptions {
.noDefaultValue()
.withDescription("Whether case conversion is required");
+ Option<Boolean> USE_KERBEROS =
+ Options.key("use_kerberos")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to enable Kerberos, default is
false.");
+
+ Option<String> KERBEROS_PRINCIPAL =
+ Options.key("kerberos_principal")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When use kerberos, we should set kerberos
principal such as 'test_user@xxx'. ");
+
+ Option<String> KERBEROS_KEYTAB_PATH =
+ Options.key("kerberos_keytab_path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When use kerberos, we should set kerberos
principal file path such as '/home/test/test_user.keytab'. ");
+
+ Option<String> KRB5_PATH =
+ Options.key("krb5_path")
+ .stringType()
+ .defaultValue("/etc/krb5.conf")
+ .withDescription(
+ "When use kerberos, we should set krb5 path file
path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");
+
Option<Map<String, String>> PROPERTIES =
Options.key("properties")
.mapType()
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
index 3d53b102bd..22438de84c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/exception/JdbcConnectorErrorCode.java
@@ -27,8 +27,9 @@ public enum JdbcConnectorErrorCode implements
SeaTunnelErrorCode {
CONNECT_DATABASE_FAILED("JDBC-04", "Connector database failed"),
TRANSACTION_OPERATION_FAILED(
"JDBC-05", "transaction operation failed, such as (commit,
rollback) etc.."),
- NO_SUITABLE_DIALECT_FACTORY("JDBC-06", "No suitable dialect factory
found");
-
+ NO_SUITABLE_DIALECT_FACTORY("JDBC-06", "No suitable dialect factory
found"),
+ DONT_SUPPORT_SINK("JDBC-07", "The jdbc type don't support sink"),
+ KERBEROS_AUTHENTICATION_FAILED("JDBC-08", "Kerberos authentication
failed");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
index 40b75ced6a..af329a99d4 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
@@ -43,7 +43,7 @@ public class SimpleJdbcConnectionProvider implements
JdbcConnectionProvider, Ser
private static final long serialVersionUID = 1L;
- private final JdbcConnectionConfig jdbcConfig;
+ protected final JdbcConnectionConfig jdbcConfig;
private transient Driver loadedDriver;
protected transient Connection connection;
@@ -88,7 +88,7 @@ public class SimpleJdbcConnectionProvider implements
JdbcConnectionProvider, Ser
}
}
- private Driver getLoadedDriver() throws SQLException,
ClassNotFoundException {
+ protected Driver getLoadedDriver() throws SQLException,
ClassNotFoundException {
if (loadedDriver == null) {
loadedDriver = loadDriver(jdbcConfig.getDriverName());
}
@@ -141,4 +141,12 @@ public class SimpleJdbcConnectionProvider implements
JdbcConnectionProvider, Ser
closeConnection();
return getOrEstablishConnection();
}
+
+ public JdbcConnectionConfig getJdbcConfig() {
+ return jdbcConfig;
+ }
+
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
index 0f554df3e8..aa7c03ccdb 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java
@@ -18,6 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
@@ -391,4 +394,9 @@ public interface JdbcDialect extends Serializable {
}
}
}
+
+ default JdbcConnectionProvider getJdbcConnectionProvider(
+ JdbcConnectionConfig jdbcConnectionConfig) {
+ return new SimpleJdbcConnectionProvider(jdbcConnectionConfig);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
new file mode 100644
index 0000000000..08e68632f7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.sql.Connection;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Optional;
+
+public class HiveDialect implements JdbcDialect {
+
+ @Override
+ public String dialectName() {
+ return DatabaseIdentifier.HIVE;
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new HiveJdbcRowConverter();
+ }
+
+ @Override
+ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+ return new HiveTypeMapper();
+ }
+
+ @Override
+ public Optional<String> getUpsertStatement(
+ String database, String tableName, String[] fieldNames, String[]
uniqueKeyFields) {
+ return Optional.empty();
+ }
+
+ @Override
+ public ResultSetMetaData getResultSetMetaData(Connection conn, String
query)
+ throws SQLException {
+ return conn.prepareStatement(query).executeQuery().getMetaData();
+ }
+
+ @Override
+ public JdbcConnectionProvider getJdbcConnectionProvider(
+ JdbcConnectionConfig jdbcConnectionConfig) {
+ return new HiveJdbcConnectionProvider(jdbcConnectionConfig);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java
new file mode 100644
index 0000000000..56bd81b7f8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/** Factory for {@link HiveDialect}. */
+@AutoService(JdbcDialectFactory.class)
+public class HiveDialectFactory implements JdbcDialectFactory {
+
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:hive2:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new HiveDialect();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
new file mode 100644
index 0000000000..1a45a8600a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.util.Properties;
+
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode.KERBEROS_AUTHENTICATION_FAILED;
+
+public class HiveJdbcConnectionProvider extends SimpleJdbcConnectionProvider {
+
+ public HiveJdbcConnectionProvider(@NonNull JdbcConnectionConfig
jdbcConfig) {
+ super(jdbcConfig);
+ }
+
+ @Override
+ public Connection getOrEstablishConnection() throws SQLException,
ClassNotFoundException {
+ if (isConnectionValid()) {
+ return super.getConnection();
+ }
+ JdbcConnectionConfig jdbcConfig = super.getJdbcConfig();
+ if (jdbcConfig.useKerberos) {
+ System.setProperty("java.security.krb5.conf", jdbcConfig.krb5Path);
+ Configuration configuration = new Configuration();
+ configuration.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(configuration);
+ try {
+ UserGroupInformation.loginUserFromKeytab(
+ jdbcConfig.kerberosPrincipal,
jdbcConfig.kerberosKeytabPath);
+ } catch (IOException e) {
+ throw new
JdbcConnectorException(KERBEROS_AUTHENTICATION_FAILED, e);
+ }
+ }
+ Driver driver = getLoadedDriver();
+ Properties info = new Properties();
+ if (super.getJdbcConfig().getUsername().isPresent()) {
+ info.setProperty("user",
super.getJdbcConfig().getUsername().get());
+ }
+ if (super.getJdbcConfig().getPassword().isPresent()) {
+ info.setProperty("password",
super.getJdbcConfig().getPassword().get());
+ }
+ super.setConnection(driver.connect(super.getJdbcConfig().getUrl(),
info));
+ if (super.getConnection() == null) {
+ // Throw same exception as DriverManager.getConnection when no
driver found to match
+ // caller expectation.
+ throw new JdbcConnectorException(
+ JdbcConnectorErrorCode.NO_SUITABLE_DRIVER,
+ "No suitable driver found for " +
super.getJdbcConfig().getUrl());
+ }
+ return super.getConnection();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
new file mode 100644
index 0000000000..91ed90105c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcRowConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import java.sql.PreparedStatement;
+
+public class HiveJdbcRowConverter extends AbstractJdbcRowConverter {
+
+ @Override
+ public String converterName() {
+ return DatabaseIdentifier.HIVE;
+ }
+
+ @Override
+ public PreparedStatement toExternal(
+ SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement
statement) {
+ throw new JdbcConnectorException(
+ JdbcConnectorErrorCode.DONT_SUPPORT_SINK,
+ "The Hive jdbc connector don't support sink");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveTypeMapper.java
new file mode 100644
index 0000000000..0940b5a2c6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveTypeMapper.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class HiveTypeMapper implements JdbcDialectTypeMapper {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveTypeMapper.class);
+
+ // reference
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types
+
+ // Numeric Types
+ private static final String HIVE_TINYINT = "TINYINT";
+ private static final String HIVE_SMALLINT = "SMALLINT";
+ private static final String HIVE_INT = "INT";
+ private static final String HIVE_INTEGER = "INTEGER";
+ private static final String HIVE_BIGINT = "BIGINT";
+ private static final String HIVE_FLOAT = "FLOAT";
+ private static final String HIVE_DOUBLE = "DOUBLE";
+ private static final String HIVE_DOUBLE_PRECISION = "DOUBLE PRECISION";
+ private static final String HIVE_DECIMAL = "DECIMAL";
+ private static final String HIVE_NUMERIC = "NUMERIC";
+ // Date/Time Types
+ private static final String HIVE_TIMESTAMP = "TIMESTAMP";
+ private static final String HIVE_DATE = "DATE";
+ private static final String HIVE_INTERVAL = "INTERVAL";
+ // String Types
+ private static final String HIVE_STRING = "STRING";
+ private static final String HIVE_VARCHAR = "VARCHAR";
+ private static final String HIVE_CHAR = "CHAR";
+ // Misc Types
+ private static final String HIVE_BOOLEAN = "BOOLEAN";
+ private static final String HIVE_BINARY = "BINARY";
+ // Complex Types
+ private static final String HIVE_ARRAY = "ARRAY";
+ private static final String HIVE_MAP = "MAP";
+ private static final String HIVE_STRUCT = "STRUCT";
+ private static final String HIVE_UNIONTYPE = "UNIONTYPE";
+
+ @Override
+ public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int
colIndex)
+ throws SQLException {
+ String columnType = metadata.getColumnTypeName(colIndex).toUpperCase();
+ int precision = metadata.getPrecision(colIndex);
+ switch (columnType) {
+ case HIVE_TINYINT:
+ return BasicType.BYTE_TYPE;
+ case HIVE_SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case HIVE_INT:
+ case HIVE_INTEGER:
+ return BasicType.INT_TYPE;
+ case HIVE_BIGINT:
+ return BasicType.LONG_TYPE;
+ case HIVE_FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case HIVE_DOUBLE:
+ case HIVE_DOUBLE_PRECISION:
+ return BasicType.DOUBLE_TYPE;
+ case HIVE_DECIMAL:
+ case HIVE_NUMERIC:
+ if (precision > 0) {
+ return new DecimalType(precision,
metadata.getScale(colIndex));
+ }
+ LOG.warn("decimal did define precision,scale, will be
Decimal(38,18)");
+ return new DecimalType(38, 18);
+ case HIVE_TIMESTAMP:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case HIVE_DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case HIVE_STRING:
+ case HIVE_VARCHAR:
+ case HIVE_CHAR:
+ return BasicType.STRING_TYPE;
+ case HIVE_BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case HIVE_BINARY:
+ case HIVE_ARRAY:
+ case HIVE_INTERVAL:
+ case HIVE_MAP:
+ case HIVE_STRUCT:
+ case HIVE_UNIONTYPE:
+ default:
+ final String jdbcColumnName = metadata.getColumnName(colIndex);
+ throw new JdbcConnectorException(
+ CommonErrorCode.UNSUPPORTED_OPERATION,
+ String.format(
+ "Doesn't support hive type '%s' on column '%s'
yet.",
+ columnType, jdbcColumnName));
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 376515d91e..5b6682cf8e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -30,7 +30,6 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionPoolProviderProxy;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
@@ -68,7 +67,7 @@ public class JdbcSinkWriter
this.rowType = rowType;
this.primaryKeyIndex = primaryKeyIndex;
this.connectionProvider =
- new
SimpleJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
+
dialect.getJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
this.outputFormat =
new JdbcOutputFormatBuilder(dialect, connectionProvider,
jdbcSinkConfig, rowType)
.build();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index 720bbfdbeb..355e95cc81 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -28,7 +28,6 @@ import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
@@ -64,11 +63,11 @@ public abstract class ChunkSplitter implements
AutoCloseable, Serializable {
this.config = config;
this.autoCommit = config.getJdbcConnectionConfig().isAutoCommit();
this.fetchSize = config.getFetchSize();
- this.connectionProvider =
- new
SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig());
this.jdbcDialect =
JdbcDialectLoader.load(
config.getJdbcConnectionConfig().getUrl(),
config.getCompatibleMode());
+ this.connectionProvider =
+
jdbcDialect.getJdbcConnectionProvider(config.getJdbcConnectionConfig());
}
public static ChunkSplitter create(JdbcSourceConfig config) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/HiveJdbcUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/HiveJdbcUtils.java
new file mode 100644
index 0000000000..0b503b3084
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/HiveJdbcUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode.KERBEROS_AUTHENTICATION_FAILED;
+
+@Slf4j
+public class HiveJdbcUtils {
+
+ public static synchronized void
doKerberosAuthentication(JdbcConnectionConfig jdbcConfig) {
+ String principal = jdbcConfig.kerberosPrincipal;
+ String keytabPath = jdbcConfig.kerberosKeytabPath;
+ String krb5Path = jdbcConfig.krb5Path;
+ System.setProperty("java.security.krb5.conf", krb5Path);
+ Configuration configuration = new Configuration();
+
+ if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath))
{
+ log.warn(
+ "Principal [{}] or keytabPath [{}] is empty, it will skip
kerberos authentication",
+ principal,
+ keytabPath);
+ } else {
+ configuration.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(configuration);
+ try {
+ log.info(
+ "Start Kerberos authentication using principal {} and
keytab {}",
+ principal,
+ keytabPath);
+ UserGroupInformation.loginUserFromKeytab(principal,
keytabPath);
+ log.info("Kerberos authentication successful");
+ } catch (IOException e) {
+ String errorMsg =
+ String.format(
+ "Kerberos authentication failed using this "
+ + "principal [%s] and keytab path
[%s]",
+ principal, keytabPath);
+ throw new
JdbcConnectorException(KERBEROS_AUTHENTICATION_FAILED, errorMsg, e);
+ }
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index a086326764..306a0552cf 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -32,7 +32,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceTableConfig;
-import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable;
@@ -101,7 +101,7 @@ public class JdbcCatalogUtils {
log.warn(
"Catalog not found, loading tables from jdbc directly. url :
{}",
jdbcConnectionConfig.getUrl());
- try (Connection connection = getConnection(jdbcConnectionConfig)) {
+ try (Connection connection = getConnection(jdbcConnectionConfig,
jdbcDialect)) {
log.info("Loading catalog tables for jdbc : {}",
jdbcConnectionConfig.getUrl());
for (JdbcSourceTableConfig tableConfig : tablesConfig) {
CatalogTable catalogTable = getCatalogTable(tableConfig,
connection, jdbcDialect);
@@ -316,9 +316,9 @@ public class JdbcCatalogUtils {
resultSetMetaData, jdbcDialect.getJdbcDialectTypeMapper());
}
- private static Connection getConnection(JdbcConnectionConfig config)
+ private static Connection getConnection(JdbcConnectionConfig config,
JdbcDialect jdbcDialect)
throws SQLException, ClassNotFoundException {
- SimpleJdbcConnectionProvider connectionProvider = new
SimpleJdbcConnectionProvider(config);
+ JdbcConnectionProvider connectionProvider =
jdbcDialect.getJdbcConnectionProvider(config);
return connectionProvider.getOrEstablishConnection();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
index 8628e2b80b..1ec13aad68 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/pom.xml
@@ -91,6 +91,11 @@
<artifactId>vertica-jdbc</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-jdbc</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
new file mode 100644
index 0000000000..69542598d4
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHiveIT.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class JdbcHiveIT extends AbstractJdbcIT {
+
+ private static final String HIVE_IMAGE = "apache/hive:3.1.3";
+ private static final String HIVE_CONTAINER_HOST = "e2ehivejdbc";
+
+ private static final String HIVE_DATABASE = "default";
+
+ private static final String HIVE_SOURCE = "e2e_table_source";
+ private static final String HIVE_USERNAME = "root";
+ private static final String HIVE_PASSWORD = null;
+ private static final int HIVE_PORT = 10000;
+ private static final String HIVE_URL = "jdbc:hive2://" + HOST + ":%s/%s";
+
+ private static final String DRIVER_CLASS =
"org.apache.hive.jdbc.HiveDriver";
+
+ private static final List<String> CONFIG_FILE =
+ Lists.newArrayList("/jdbc_hive_source_and_assert.conf");
+ private static final String CREATE_SQL =
+ "CREATE TABLE hive_e2e_source_table"
+ + "("
+ + " int_column INT,"
+ + " integer_column INTEGER,"
+ + " bigint_column BIGINT,"
+ + " smallint_column SMALLINT,"
+ + " tinyint_column TINYINT,"
+ + " double_column DOUBLE,"
+ + " double_PRECISION_column DOUBLE PRECISION,"
+ + " float_column FLOAT,"
+ + " string_column STRING,"
+ + " char_column CHAR(10),"
+ + " varchar_column VARCHAR(20),"
+ + " boolean_column BOOLEAN,"
+ + " date_column DATE,"
+ + " timestamp_column TIMESTAMP,"
+ + " decimal_column DECIMAL(10, 2),"
+ + " numeric_column NUMERIC(10, 2)"
+ + ")";
+
+ @Override
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ String jdbcUrl = String.format(HIVE_URL, HIVE_PORT, HIVE_DATABASE);
+ return JdbcCase.builder()
+ .dockerImage(HIVE_IMAGE)
+ .networkAliases(HIVE_CONTAINER_HOST)
+ .containerEnv(containerEnv)
+ .driverClass(DRIVER_CLASS)
+ .host(HOST)
+ .port(HIVE_PORT)
+ .localPort(HIVE_PORT)
+ .jdbcTemplate(HIVE_URL)
+ .jdbcUrl(jdbcUrl)
+ .userName(HIVE_USERNAME)
+ .password(HIVE_PASSWORD)
+ .database(HIVE_DATABASE)
+ .sourceTable(HIVE_SOURCE)
+ .createSql(CREATE_SQL)
+ .configFile(CONFIG_FILE)
+ .build();
+ }
+
+ protected void createNeededTables() {
+ try (Statement statement = connection.createStatement()) {
+ String createTemplate = jdbcCase.getCreateSql();
+ String createSource =
+ String.format(
+ createTemplate,
+ buildTableInfoWithSchema(
+ jdbcCase.getDatabase(),
jdbcCase.getSourceTable()));
+ statement.execute(createSource);
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
+ }
+ }
+
+ protected void insertTestData() {
+ try (Statement statement = connection.createStatement()) {
+ for (int i = 1; i <= 3; i++) {
+ statement.execute(
+ "INSERT INTO hive_e2e_source_table "
+ + "VALUES (2,"
+ + " 1,"
+ + " 1234567890,"
+ + " 32767,"
+ + " 127,"
+ + " 123.45,"
+ + " 123.45,"
+ + " 67.89,"
+ + " 'Hello, Hive',"
+ + " 'CharCol',"
+ + " 'VarcharCol',"
+ + " TRUE,"
+ + " '2023-09-04',"
+ + " '2023-09-04 10:30:00',"
+ + " 42.12,"
+ + " 42.12)");
+ }
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw new
SeaTunnelRuntimeException(JdbcITErrorCode.INSERT_DATA_FAILED, exception);
+ }
+ }
+
+ @Override
+ void compareResult() {}
+
+ @Override
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar";
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ return null;
+ }
+
+ @Override
+ GenericContainer<?> initContainer() {
+ GenericContainer<?> container =
+ new GenericContainer<>(HIVE_IMAGE)
+ .withExposedPorts(HIVE_PORT)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(HIVE_CONTAINER_HOST)
+ .withEnv("SERVICE_NAME", "hiveserver2")
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(HIVE_IMAGE)));
+ container.setPortBindings(Lists.newArrayList(String.format("%s:%s",
HIVE_PORT, HIVE_PORT)));
+ return container;
+ }
+
+ public void clearTable(String schema, String table) {
+ // do nothing.
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
new file mode 100644
index 0000000000..04b0240a3b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
@@ -0,0 +1,141 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Jdbc {
+ url = "jdbc:hive2://e2ehivejdbc:10000/default"
+ user = "root"
+ driver = "org.apache.hive.jdbc.HiveDriver"
+ query = "select * from hive_e2e_source_table"
+ auto_commit= false
+ }
+}
+
+transform {
+}
+
+sink{
+ assert {
+ rules =
+ {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = hive_e2e_source_table.int_column
+ field_type = int
+ field_value = [{equals_to = 2}]
+ },
+ {
+ field_name = hive_e2e_source_table.integer_column
+ field_type = int
+ field_value = [{equals_to = 1}]
+ },
+ {
+ field_name = hive_e2e_source_table.bigint_column
+ field_type = long
+ field_value = [{equals_to = 1234567890}]
+ },
+ {
+ field_name = hive_e2e_source_table.smallint_column
+ field_type = short
+ field_value = [{equals_to = 32767}]
+ },
+ {
+ field_name = hive_e2e_source_table.tinyint_column
+ field_type = byte
+ field_value = [{equals_to = 127}]
+ },
+ {
+ field_name = hive_e2e_source_table.double_column
+ field_type = double
+ field_value = [{equals_to = 123.45}]
+ },
+ {
+ field_name = hive_e2e_source_table.double_precision_column
+ field_type = double
+ field_value = [{equals_to = 123.45}]
+ },
+ {
+ field_name = hive_e2e_source_table.float_column
+ field_type = float
+ field_value = [{equals_to = 67.89}]
+ },
+ {
+ field_name = hive_e2e_source_table.string_column
+ field_type = string
+ field_value = [{equals_to = "Hello, Hive"}]
+ },
+ {
+ field_name = hive_e2e_source_table.char_column
+ field_type = string
+ field_value = [{equals_to = "CharCol "}]
+ },
+ {
+ field_name = hive_e2e_source_table.varchar_column
+ field_type = string
+ field_value = [{equals_to = "VarcharCol"}]
+ },
+ {
+ field_name = hive_e2e_source_table.boolean_column
+ field_type = boolean
+ field_value = [{equals_to = "TRUE"}]
+ },
+ {
+ field_name = hive_e2e_source_table.date_column
+ field_type = date
+ field_value = [{equals_to = "2023-09-04"}]
+ },
+ {
+ field_name = hive_e2e_source_table.timestamp_column
+ field_type = timestamp
+ field_value = [{equals_to = "2023-09-04T10:30:00"}]
+ },
+ {
+ field_name = hive_e2e_source_table.decimal_column
+ field_type = decimal
+ field_value = [{equals_to = 42.12}]
+ },
+ {
+ field_name = hive_e2e_source_table.numeric_column
+ field_type = decimal
+ field_value = [{equals_to = 42.12}]
+ },
+ ]
+ }
+ }
+}