This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f7994d9ae9 [Feature][Connector-V2][Clickhouse] Add clickhouse.config 
to the source connector (#7143)
f7994d9ae9 is described below

commit f7994d9ae9770cd2d88afb5244f0071be2910d05
Author: Dongyeon Lee <[email protected]>
AuthorDate: Tue Jul 9 16:39:33 2024 +0900

    [Feature][Connector-V2][Clickhouse] Add clickhouse.config to the source 
connector (#7143)
    
    * The source connector of Clickhouse support a custom clickhouse config
    
    * UPDATE doc
    
    * fmt
    
    ---------
    
    Co-authored-by: Dongyeon <[email protected]>
---
 docs/en/connector-v2/source/Clickhouse.md          | 22 ++++++-----
 release-note.md                                    |  3 +-
 .../clickhouse/sink/client/ClickhouseSink.java     |  4 +-
 .../clickhouse/sink/file/ClickhouseFileSink.java   |  3 +-
 .../clickhouse/source/ClickhouseSource.java        | 18 ++++++++-
 .../clickhouse/source/ClickhouseSourceFactory.java |  6 ++-
 .../seatunnel/clickhouse/util/ClickhouseUtil.java  | 45 +++++++++++++---------
 7 files changed, 68 insertions(+), 33 deletions(-)

diff --git a/docs/en/connector-v2/source/Clickhouse.md 
b/docs/en/connector-v2/source/Clickhouse.md
index c23b25e92e..6fe0a5bb56 100644
--- a/docs/en/connector-v2/source/Clickhouse.md
+++ b/docs/en/connector-v2/source/Clickhouse.md
@@ -49,15 +49,16 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 
 ## Source Options
 
-|       Name       |  Type  | Required |        Default         |              
                                                 Description                    
                                            |
-|------------------|--------|----------|------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
-| host             | String | Yes      | -                      | `ClickHouse` 
cluster address, the format is `host:port` , allowing multiple `hosts` to be 
specified. Such as `"host1:8123,host2:8123"` . |
-| database         | String | Yes      | -                      | The 
`ClickHouse` database.                                                          
                                                     |
-| sql              | String | Yes      | -                      | The query 
sql used to search data though Clickhouse server.                               
                                               |
-| username         | String | Yes      | -                      | `ClickHouse` 
user username.                                                                  
                                            |
-| password         | String | Yes      | -                      | `ClickHouse` 
user password.                                                                  
                                            |
-| server_time_zone | String | No       | ZoneId.systemDefault() | The session 
time zone in database server. If not set, then ZoneId.systemDefault() is used 
to determine the server time zone.             |
-| common-options   |        | No       | -                      | Source 
plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.                                 |
+|       Name        |  Type  | Required |        Default         |             
                                                                                
                                                    Description                 
                                                                                
                                                |
+|-------------------|--------|----------|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| host              | String | Yes      | -                      | 
`ClickHouse` cluster address, the format is `host:port` , allowing multiple 
`hosts` to be specified. Such as `"host1:8123,host2:8123"` .                    
                                                                                
                                                                |
+| database          | String | Yes      | -                      | The 
`ClickHouse` database.                                                          
                                                                                
                                                                                
                                                        |
+| sql               | String | Yes      | -                      | The query 
sql used to search data though Clickhouse server.                               
                                                                                
                                                                                
                                                  |
+| username          | String | Yes      | -                      | 
`ClickHouse` user username.                                                     
                                                                                
                                                                                
                                                            |
+| password          | String | Yes      | -                      | 
`ClickHouse` user password.                                                     
                                                                                
                                                                                
                                                            |
+| clickhouse.config | Map    | No       | -                      | In addition 
to the above mandatory parameters that must be specified by `clickhouse-jdbc` , 
users can also specify multiple optional parameters, which cover all the 
[parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration)
 provided by `clickhouse-jdbc`. |
+| server_time_zone  | String | No       | ZoneId.systemDefault() | The session 
time zone in database server. If not set, then ZoneId.systemDefault() is used 
to determine the server time zone.                                              
                                                                                
                                                  |
+| common-options    |        | No       | -                      | Source 
plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.                                        
                                                                                
                                                                            |
 
 ## How to Create a Clickhouse Data Synchronization Jobs
 
@@ -80,6 +81,9 @@ source {
     password = "xxxxx"
     server_time_zone = "UTC"
     result_table_name = "test"
+    clickhouse.config = {
+      "socket_timeout": "300000"
+    }
   }
 }
 
diff --git a/release-note.md b/release-note.md
index 0b5e884572..32067c22df 100644
--- a/release-note.md
+++ b/release-note.md
@@ -56,7 +56,8 @@
 - [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
 - [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
 - [Connector-v2] [Mongodb] Support to convert to double from numeric type that 
mongodb saved it as numeric internally (#6997)
-- [Connector-v2] [Redis] Using scan replace keys operation command,support 
batchWrite in single mode(#7030,#7085) 
+- [Connector-v2] [Redis] Using scan replace keys operation command,support 
batchWrite in single mode(#7030,#7085)
+- [Connector-V2] [Clickhouse] Add a new optional configuration 
`clickhouse.config` to the source connector of ClickHouse (#7143)
 
 ### Zeta(ST-Engine)
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 5d31755e5b..d2de6fd182 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -113,6 +113,7 @@ public class ClickhouseSink
                             config.getString(DATABASE.key()),
                             config.getString(SERVER_TIME_ZONE.key()),
                             null,
+                            null,
                             null);
         } else {
             nodes =
@@ -121,7 +122,8 @@ public class ClickhouseSink
                             config.getString(DATABASE.key()),
                             config.getString(SERVER_TIME_ZONE.key()),
                             config.getString(USERNAME.key()),
-                            config.getString(PASSWORD.key()));
+                            config.getString(PASSWORD.key()),
+                            null);
         }
 
         Properties clickhouseProperties = new Properties();
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 7a56a0010e..cc63179f16 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -116,7 +116,8 @@ public class ClickhouseFileSink
                         config.getString(DATABASE.key()),
                         config.getString(SERVER_TIME_ZONE.key()),
                         config.getString(USERNAME.key()),
-                        config.getString(PASSWORD.key()));
+                        config.getString(PASSWORD.key()),
+                        null);
 
         ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
         Map<String, String> tableSchema =
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index e49b03091a..a79d2df8de 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -50,7 +50,9 @@ import com.google.common.collect.ImmutableMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
@@ -96,13 +98,27 @@ public class ClickhouseSource
                         .build();
 
         config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
+
+        Map<String, String> customConfig = null;
+
+        if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) {
+            customConfig =
+                    
config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            Map.Entry::getKey,
+                                            entrySet ->
+                                                    
entrySet.getValue().unwrapped().toString()));
+        }
+
         servers =
                 ClickhouseUtil.createNodes(
                         config.getString(HOST.key()),
                         config.getString(DATABASE.key()),
                         config.getString(SERVER_TIME_ZONE.key()),
                         config.getString(USERNAME.key()),
-                        config.getString(PASSWORD.key()));
+                        config.getString(PASSWORD.key()),
+                        customConfig);
 
         sql = config.getString(SQL.key());
         ClickHouseNode currentServer =
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
index 17e4fef1fd..4adea4b80c 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.api.table.factory.TableSourceFactory;
 
 import com.google.auto.service.AutoService;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
@@ -39,7 +40,10 @@ public class ClickhouseSourceFactory implements 
TableSourceFactory {
 
     @Override
     public OptionRule optionRule() {
-        return OptionRule.builder().required(HOST, DATABASE, SQL, USERNAME, 
PASSWORD).build();
+        return OptionRule.builder()
+                .required(HOST, DATABASE, SQL, USERNAME, PASSWORD)
+                .optional(CLICKHOUSE_CONFIG)
+                .build();
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
index e8e491635f..f787cf5c8f 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
 
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 
 import com.clickhouse.client.ClickHouseCredentials;
@@ -25,6 +26,7 @@ import com.clickhouse.client.ClickHouseProtocol;
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class ClickhouseUtil {
@@ -34,30 +36,35 @@ public class ClickhouseUtil {
             String database,
             String serverTimeZone,
             String username,
-            String password) {
+            String password,
+            Map<String, String> options) {
         return Arrays.stream(nodeAddress.split(","))
                 .map(
                         address -> {
                             String[] nodeAndPort = address.split(":", 2);
-                            if (StringUtils.isEmpty(username) && 
StringUtils.isEmpty(password)) {
-                                return ClickHouseNode.builder()
-                                        .host(nodeAndPort[0])
-                                        .port(
-                                                ClickHouseProtocol.HTTP,
-                                                
Integer.parseInt(nodeAndPort[1]))
-                                        .database(database)
-                                        .timeZone(serverTimeZone)
-                                        .build();
+                            ClickHouseNode.Builder builder =
+                                    ClickHouseNode.builder()
+                                            .host(nodeAndPort[0])
+                                            .port(
+                                                    ClickHouseProtocol.HTTP,
+                                                    
Integer.parseInt(nodeAndPort[1]))
+                                            .database(database)
+                                            .timeZone(serverTimeZone);
+                            if (MapUtils.isNotEmpty(options)) {
+                                for (Map.Entry<String, String> entry : 
options.entrySet()) {
+                                    builder = 
builder.addOption(entry.getKey(), entry.getValue());
+                                }
                             }
-                            return ClickHouseNode.builder()
-                                    .host(nodeAndPort[0])
-                                    .port(ClickHouseProtocol.HTTP, 
Integer.parseInt(nodeAndPort[1]))
-                                    .database(database)
-                                    .timeZone(serverTimeZone)
-                                    .credentials(
-                                            
ClickHouseCredentials.fromUserAndPassword(
-                                                    username, password))
-                                    .build();
+
+                            if (StringUtils.isNotEmpty(username)
+                                    && StringUtils.isNotEmpty(password)) {
+                                builder =
+                                        builder.credentials(
+                                                
ClickHouseCredentials.fromUserAndPassword(
+                                                        username, password));
+                            }
+
+                            return builder.build();
                         })
                 .collect(Collectors.toList());
     }

Reply via email to