This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 807c708026 [Improve][Connector-v2][Hive] Add socket and connection 
timeout options for Hive JDBC connections (#10254)
807c708026 is described below

commit 807c7080265eafe0f83f02e6720ce491ffb4a757
Author: Jast <[email protected]>
AuthorDate: Sun Jan 4 20:50:03 2026 +0800

    [Improve][Connector-v2][Hive] Add socket and connection timeout options for 
Hive JDBC connections (#10254)
---
 docs/en/connector-v2/source/HiveJdbc.md            |  6 ++
 docs/zh/connector-v2/source/HiveJdbc.md            |  6 ++
 .../seatunnel/jdbc/catalog/utils/CatalogUtils.java | 88 ++++++++++++----------
 .../seatunnel/jdbc/config/JdbcCommonOptions.java   | 14 ++++
 .../jdbc/config/JdbcConnectionConfig.java          | 20 +++++
 .../dialect/hive/HiveJdbcConnectionProvider.java   | 29 ++++++-
 6 files changed, 124 insertions(+), 39 deletions(-)

diff --git a/docs/en/connector-v2/source/HiveJdbc.md 
b/docs/en/connector-v2/source/HiveJdbc.md
index c4a6ccdbfc..9403b6facc 100644
--- a/docs/en/connector-v2/source/HiveJdbc.md
+++ b/docs/en/connector-v2/source/HiveJdbc.md
@@ -8,6 +8,10 @@ import ChangeLog from '../changelog/connector-jdbc.md';
 
 - Definitely supports 3.1.3 and 3.1.2, other versions need to be tested.
 
+## Timeout Parameter Support
+
+The `socket_timeout_ms` and `connect_timeout_ms` parameters are tested with 
**Hive 3.2.0+**. For earlier versions (including 3.1.x), these parameters have 
not been verified yet. The parameters will be passed to the JDBC driver, but 
their effectiveness depends on the Hive version being used.
+
 ## Support Those Engines
 
 > Spark<br/>
@@ -68,6 +72,8 @@ Read external data source data through JDBC.
 | password                     | String     | No       | -               | 
Connection instance password                                                    
                                                                                
                                                                                
                  |
 | query                        | String     | Yes      | -               | 
Query statement                                                                 
                                                                                
                                                                                
                  |
 | connection_check_timeout_sec | Int        | No       | 30              | The 
time in seconds to wait for the database operation used to validate the 
connection to complete                                                          
                                                                                
                      |
+| socket_timeout_ms            | Int        | No       | 86400000        | 
Socket timeout in milliseconds for reading data from the server. Set to 0 for 
no timeout. Note: Tested with Hive 3.2.0+. For earlier versions, not yet 
verified.                                                                       
                                       |
+| connect_timeout_ms           | Int        | No       | 86400000        | 
Connection timeout in milliseconds for establishing connection to the server. 
Set to 0 for no timeout. Note: Tested with Hive 3.2.0+. For earlier versions, 
not yet verified.                                                               
                                    |
 | partition_column             | String     | No       | -               | The 
column name for parallelism's partition, only support numeric type,Only support 
numeric type primary key, and only can config one column.                       
                                                                                
              |
 | partition_lower_bound        | BigDecimal | No       | -               | The 
partition_column min value for scan, if not set SeaTunnel will query database 
get min value.                                                                  
                                                                                
                |
 | partition_upper_bound        | BigDecimal | No       | -               | The 
partition_column max value for scan, if not set SeaTunnel will query database 
get max value.                                                                  
                                                                                
                |
diff --git a/docs/zh/connector-v2/source/HiveJdbc.md 
b/docs/zh/connector-v2/source/HiveJdbc.md
index 094f76083c..0bc5a9ab9a 100644
--- a/docs/zh/connector-v2/source/HiveJdbc.md
+++ b/docs/zh/connector-v2/source/HiveJdbc.md
@@ -8,6 +8,10 @@ import ChangeLog from '../changelog/connector-jdbc.md';
 
 - 确定支持3.1.3和3.1.2,其他版本需要测试。
 
+## 超时参数支持
+
+`socket_timeout_ms` 和 `connect_timeout_ms` 参数已在 **Hive 3.2.0+** 
版本上测试验证。对于更早的版本(包括 3.1.x),这些参数暂未验证。参数会被传递给 JDBC 驱动,但实际效果取决于使用的 Hive 版本。
+
 ## 支持这些引擎
 
 > Spark<br/>
@@ -68,6 +72,8 @@ import ChangeLog from '../changelog/connector-jdbc.md';
 | password                     | String     | 否  | -               | 连接实例密码    
                                                                                
                                  |
 | query                        | String     | 是  | -               | 查询sql     
                                                                                
                                  |
 | connection_check_timeout_sec | Int        | 否  | 30              | 
等待用于验证连接的数据库操作完成的时间(秒)                                                          
                                            |
+| socket_timeout_ms            | Int        | 否  | 86400000        | 从服务器读取数据的 
Socket 超时时间(毫秒)。设置为 0 表示无超时。注意:已在 Hive 3.2.0+ 测试,更早版本暂未验证。                      
                                   |
+| connect_timeout_ms           | Int        | 否  | 86400000        | 
建立到服务器的连接超时时间(毫秒)。设置为 0 表示无超时。注意:已在 Hive 3.2.0+ 测试,更早版本暂未验证。                    
                                        |
 | partition_column             | String     | 否  | -               | 
并行分区的列名,只支持数值类型,只支持数字类型主键,只能配置一列。                                               
                                            |
 | partition_lower_bound        | BigDecimal | 否  | -               | 
扫描的分区列最小值,如果未设置,SeaTunnel将查询数据库获取最小值。                                           
                                            |
 | partition_upper_bound        | BigDecimal | 否  | -               | 
扫描的分区列最大值,如果没有设置,SeaTunnel将查询数据库获取最大值。                                          
                                            |
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
index b98d7cb977..f6681c5e08 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
@@ -44,6 +44,7 @@ import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -156,46 +157,57 @@ public class CatalogUtils {
 
     public static List<ConstraintKey> getConstraintKeys(
             DatabaseMetaData metadata, TablePath tablePath) throws 
SQLException {
-        // We set approximate to true to avoid querying the statistics table, 
which is slow.
-        try (ResultSet resultSet =
-                metadata.getIndexInfo(
-                        tablePath.getDatabaseName(),
-                        tablePath.getSchemaName(),
-                        tablePath.getTableName(),
-                        false,
-                        true)) {
-            // index name -> index
-            Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
-            while (resultSet.next()) {
-                String columnName = resultSet.getString("COLUMN_NAME");
-                if (columnName == null) {
-                    continue;
+        try {
+            // We set approximate to true to avoid querying the statistics 
table, which is slow.
+            try (ResultSet resultSet =
+                    metadata.getIndexInfo(
+                            tablePath.getDatabaseName(),
+                            tablePath.getSchemaName(),
+                            tablePath.getTableName(),
+                            false,
+                            true)) {
+                // index name -> index
+                Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
+                while (resultSet.next()) {
+                    String columnName = resultSet.getString("COLUMN_NAME");
+                    if (columnName == null) {
+                        continue;
+                    }
+                    String indexName = 
cleanKeyName(resultSet.getString("INDEX_NAME"));
+                    boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
+
+                    ConstraintKey constraintKey =
+                            constraintKeyMap.computeIfAbsent(
+                                    indexName,
+                                    s -> {
+                                        ConstraintKey.ConstraintType 
constraintType =
+                                                
ConstraintKey.ConstraintType.INDEX_KEY;
+                                        if (!noUnique) {
+                                            constraintType =
+                                                    
ConstraintKey.ConstraintType.UNIQUE_KEY;
+                                        }
+                                        return ConstraintKey.of(
+                                                constraintType, indexName, new 
ArrayList<>());
+                                    });
+
+                    ConstraintKey.ColumnSortType sortType =
+                            
"A".equalsIgnoreCase(resultSet.getString("ASC_OR_DESC"))
+                                    ? ConstraintKey.ColumnSortType.ASC
+                                    : ConstraintKey.ColumnSortType.DESC;
+                    ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
+                            new ConstraintKey.ConstraintKeyColumn(columnName, 
sortType);
+                    constraintKey.getColumnNames().add(constraintKeyColumn);
                 }
-                String indexName = 
cleanKeyName(resultSet.getString("INDEX_NAME"));
-                boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
-
-                ConstraintKey constraintKey =
-                        constraintKeyMap.computeIfAbsent(
-                                indexName,
-                                s -> {
-                                    ConstraintKey.ConstraintType 
constraintType =
-                                            
ConstraintKey.ConstraintType.INDEX_KEY;
-                                    if (!noUnique) {
-                                        constraintType = 
ConstraintKey.ConstraintType.UNIQUE_KEY;
-                                    }
-                                    return ConstraintKey.of(
-                                            constraintType, indexName, new 
ArrayList<>());
-                                });
-
-                ConstraintKey.ColumnSortType sortType =
-                        "A".equals(resultSet.getString("ASC_OR_DESC"))
-                                ? ConstraintKey.ColumnSortType.ASC
-                                : ConstraintKey.ColumnSortType.DESC;
-                ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
-                        new ConstraintKey.ConstraintKeyColumn(columnName, 
sortType);
-                constraintKey.getColumnNames().add(constraintKeyColumn);
+                return new ArrayList<>(constraintKeyMap.values());
             }
-            return new ArrayList<>(constraintKeyMap.values());
+        } catch (SQLException e) {
+            // Some JDBC drivers (e.g., Hive/Inceptor) do not fully support 
getIndexInfo()
+            // Return empty list as index information is not mandatory for 
table schema
+            log.warn(
+                    "Failed to get index info for table {}, returning empty 
constraint keys. Error: {}",
+                    tablePath,
+                    e.getMessage());
+            return Collections.emptyList();
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
index 55abfc4694..f0ace0aa14 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
@@ -48,6 +48,20 @@ public class JdbcCommonOptions {
                     .defaultValue(30)
                     .withDescription("connection check time second");
 
+    public static final Option<Integer> SOCKET_TIMEOUT_MS =
+            Options.key("socket_timeout_ms")
+                    .intType()
+                    .defaultValue(1000 * 60 * 60 * 24)
+                    .withDescription(
+                            "Socket timeout in milliseconds for reading data 
from the server. Default is 24h. Set to 0 for no timeout.");
+
+    public static final Option<Integer> CONNECT_TIMEOUT_MS =
+            Options.key("connect_timeout_ms")
+                    .intType()
+                    .defaultValue(1000 * 60 * 60 * 24)
+                    .withDescription(
+                            "Connection timeout in milliseconds for 
establishing connection to the server. Default is 24h. Set to 0 for no 
timeout.");
+
     public static final Option<String> COMPATIBLE_MODE =
             Options.key("compatible_mode")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
index b3b49e465f..1c2ed5830f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
@@ -53,6 +53,10 @@ public class JdbcConnectionConfig implements Serializable {
 
     private int transactionTimeoutSec = 
JdbcSinkOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
 
+    private int socketTimeoutMs = 
JdbcCommonOptions.SOCKET_TIMEOUT_MS.defaultValue();
+
+    private int connectTimeoutMs = 
JdbcCommonOptions.CONNECT_TIMEOUT_MS.defaultValue();
+
     private boolean useKerberos = 
JdbcCommonOptions.USE_KERBEROS.defaultValue();
 
     private String kerberosPrincipal;
@@ -79,6 +83,8 @@ public class JdbcConnectionConfig implements Serializable {
         builder.maxRetries(config.get(JdbcSinkOptions.MAX_RETRIES));
         builder.connectionCheckTimeoutSeconds(
                 config.get(JdbcCommonOptions.CONNECTION_CHECK_TIMEOUT_SEC));
+        
builder.socketTimeoutMs(config.get(JdbcCommonOptions.SOCKET_TIMEOUT_MS));
+        
builder.connectTimeoutMs(config.get(JdbcCommonOptions.CONNECT_TIMEOUT_MS));
         builder.batchSize(config.get(JdbcSinkOptions.BATCH_SIZE));
         
builder.handleBlobAsString(config.get(JdbcCommonOptions.HANDLE_BLOB_AS_STRING));
         if (config.get(JdbcSinkOptions.IS_EXACTLY_ONCE)) {
@@ -143,6 +149,8 @@ public class JdbcConnectionConfig implements Serializable {
         private boolean handleBlobAsString = 
JdbcCommonOptions.HANDLE_BLOB_AS_STRING.defaultValue();
         private int maxCommitAttempts = 
JdbcSinkOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
         private int transactionTimeoutSec = 
JdbcSinkOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
+        private int socketTimeoutMs = 
JdbcCommonOptions.SOCKET_TIMEOUT_MS.defaultValue();
+        private int connectTimeoutMs = 
JdbcCommonOptions.CONNECT_TIMEOUT_MS.defaultValue();
         private Map<String, String> properties;
         public boolean useKerberos = 
JdbcCommonOptions.USE_KERBEROS.defaultValue();
         public String kerberosPrincipal;
@@ -230,6 +238,16 @@ public class JdbcConnectionConfig implements Serializable {
             return this;
         }
 
+        public Builder socketTimeoutMs(int socketTimeoutMs) {
+            this.socketTimeoutMs = socketTimeoutMs;
+            return this;
+        }
+
+        public Builder connectTimeoutMs(int connectTimeoutMs) {
+            this.connectTimeoutMs = connectTimeoutMs;
+            return this;
+        }
+
         public Builder useKerberos(boolean useKerberos) {
             this.useKerberos = useKerberos;
             return this;
@@ -292,6 +310,8 @@ public class JdbcConnectionConfig implements Serializable {
             jdbcConnectionConfig.autoCommit = this.autoCommit;
             jdbcConnectionConfig.username = this.username;
             jdbcConnectionConfig.transactionTimeoutSec = 
this.transactionTimeoutSec;
+            jdbcConnectionConfig.socketTimeoutMs = this.socketTimeoutMs;
+            jdbcConnectionConfig.connectTimeoutMs = this.connectTimeoutMs;
             jdbcConnectionConfig.maxCommitAttempts = this.maxCommitAttempts;
             jdbcConnectionConfig.xaDataSourceClassName = 
this.xaDataSourceClassName;
             jdbcConnectionConfig.decimalTypeNarrowing = 
this.decimalTypeNarrowing;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
index bd8fbc0c49..d059a32037 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.Simple
 import org.apache.hadoop.conf.Configuration;
 
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
 import java.sql.Connection;
 import java.sql.Driver;
@@ -32,6 +33,7 @@ import java.util.Properties;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode.KERBEROS_AUTHENTICATION_FAILED;
 
+@Slf4j
 public class HiveJdbcConnectionProvider extends SimpleJdbcConnectionProvider {
 
     public HiveJdbcConnectionProvider(@NonNull JdbcConnectionConfig 
jdbcConfig) {
@@ -98,7 +100,32 @@ public class HiveJdbcConnectionProvider extends 
SimpleJdbcConnectionProvider {
             jdbcConnectionConfig
                     .getPassword()
                     .ifPresent(password -> info.setProperty("password", 
password));
-            return driver.connect(jdbcConnectionConfig.getUrl(), info);
+
+            int socketTimeoutMs = jdbcConnectionConfig.getSocketTimeoutMs();
+            int connectTimeoutMs = jdbcConnectionConfig.getConnectTimeoutMs();
+
+            if (socketTimeoutMs > 0) {
+                info.setProperty("socketTimeout", 
String.valueOf(socketTimeoutMs));
+            }
+            if (connectTimeoutMs > 0) {
+                info.setProperty("connectTimeout", 
String.valueOf(connectTimeoutMs));
+            }
+
+            Connection connection = 
driver.connect(jdbcConnectionConfig.getUrl(), info);
+
+            if (connection != null) {
+                log.info(
+                        "[HiveConnectionProvider] Connection created 
successfully: {}",
+                        connection.getClass().getName());
+            } else {
+                log.warn("[HiveConnectionProvider] Connection is null!");
+                log.warn("  - URL: {}", jdbcConnectionConfig.getUrl());
+                log.warn("  - User: {}", 
jdbcConnectionConfig.getUsername().orElse("N/A"));
+                log.warn("  - socketTimeout: {} ms (0 = no timeout)", 
socketTimeoutMs);
+                log.warn("  - connectTimeout: {} ms (0 = no timeout)", 
connectTimeoutMs);
+            }
+
+            return connection;
         }
     }
 }

Reply via email to