This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f7994d9ae9 [Feature][Connector-V2][Clickhouse] Add clickhouse.config
to the source connector (#7143)
f7994d9ae9 is described below
commit f7994d9ae9770cd2d88afb5244f0071be2910d05
Author: Dongyeon Lee <[email protected]>
AuthorDate: Tue Jul 9 16:39:33 2024 +0900
[Feature][Connector-V2][Clickhouse] Add clickhouse.config to the source
connector (#7143)
* The source connector of Clickhouse support a custom clickhouse config
* UPDATE doc
* fmt
---------
Co-authored-by: Dongyeon <[email protected]>
---
docs/en/connector-v2/source/Clickhouse.md | 22 ++++++-----
release-note.md | 3 +-
.../clickhouse/sink/client/ClickhouseSink.java | 4 +-
.../clickhouse/sink/file/ClickhouseFileSink.java | 3 +-
.../clickhouse/source/ClickhouseSource.java | 18 ++++++++-
.../clickhouse/source/ClickhouseSourceFactory.java | 6 ++-
.../seatunnel/clickhouse/util/ClickhouseUtil.java | 45 +++++++++++++---------
7 files changed, 68 insertions(+), 33 deletions(-)
diff --git a/docs/en/connector-v2/source/Clickhouse.md
b/docs/en/connector-v2/source/Clickhouse.md
index c23b25e92e..6fe0a5bb56 100644
--- a/docs/en/connector-v2/source/Clickhouse.md
+++ b/docs/en/connector-v2/source/Clickhouse.md
@@ -49,15 +49,16 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
## Source Options
-| Name | Type | Required | Default |
Description
|
-|------------------|--------|----------|------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
-| host | String | Yes | - | `ClickHouse`
cluster address, the format is `host:port` , allowing multiple `hosts` to be
specified. Such as `"host1:8123,host2:8123"` . |
-| database | String | Yes | - | The
`ClickHouse` database.
|
-| sql | String | Yes | - | The query
sql used to search data though Clickhouse server.
|
-| username | String | Yes | - | `ClickHouse`
user username.
|
-| password | String | Yes | - | `ClickHouse`
user password.
|
-| server_time_zone | String | No | ZoneId.systemDefault() | The session
time zone in database server. If not set, then ZoneId.systemDefault() is used
to determine the server time zone. |
-| common-options | | No | - | Source
plugin common parameters, please refer to [Source Common
Options](common-options.md) for details. |
+| Name | Type | Required | Default |
Description
|
+|-------------------|--------|----------|------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| host | String | Yes | - |
`ClickHouse` cluster address, the format is `host:port` , allowing multiple
`hosts` to be specified. Such as `"host1:8123,host2:8123"` .
|
+| database | String | Yes | - | The
`ClickHouse` database.
|
+| sql | String | Yes | - | The query
sql used to search data though Clickhouse server.
|
+| username | String | Yes | - |
`ClickHouse` user username.
|
+| password | String | Yes | - |
`ClickHouse` user password.
|
+| clickhouse.config | Map | No | - | In addition
to the above mandatory parameters that must be specified by `clickhouse-jdbc` ,
users can also specify multiple optional parameters, which cover all the
[parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration)
provided by `clickhouse-jdbc`. |
+| server_time_zone | String | No | ZoneId.systemDefault() | The session
time zone in database server. If not set, then ZoneId.systemDefault() is used
to determine the server time zone.
|
+| common-options | | No | - | Source
plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
|
## How to Create a Clickhouse Data Synchronization Jobs
@@ -80,6 +81,9 @@ source {
password = "xxxxx"
server_time_zone = "UTC"
result_table_name = "test"
+ clickhouse.config = {
+ "socket_timeout": "300000"
+ }
}
}
diff --git a/release-note.md b/release-note.md
index 0b5e884572..32067c22df 100644
--- a/release-note.md
+++ b/release-note.md
@@ -56,7 +56,8 @@
- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
- [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
- [Connector-v2] [Mongodb] Support to convert to double from numeric type that
mongodb saved it as numeric internally (#6997)
-- [Connector-v2] [Redis] Using scan replace keys operation command,support
batchWrite in single mode(#7030,#7085)
+- [Connector-v2] [Redis] Using scan replace keys operation command,support
batchWrite in single mode(#7030,#7085)
+- [Connector-V2] [Clickhouse] Add a new optional configuration
`clickhouse.config` to the source connector of ClickHouse (#7143)
### Zeta(ST-Engine)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 5d31755e5b..d2de6fd182 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -113,6 +113,7 @@ public class ClickhouseSink
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
null,
+ null,
null);
} else {
nodes =
@@ -121,7 +122,8 @@ public class ClickhouseSink
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
- config.getString(PASSWORD.key()));
+ config.getString(PASSWORD.key()),
+ null);
}
Properties clickhouseProperties = new Properties();
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 7a56a0010e..cc63179f16 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -116,7 +116,8 @@ public class ClickhouseFileSink
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
- config.getString(PASSWORD.key()));
+ config.getString(PASSWORD.key()),
+ null);
ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
Map<String, String> tableSchema =
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index e49b03091a..a79d2df8de 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -50,7 +50,9 @@ import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
@@ -96,13 +98,27 @@ public class ClickhouseSource
.build();
config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
+
+ Map<String, String> customConfig = null;
+
+ if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) {
+ customConfig =
+
config.getObject(CLICKHOUSE_CONFIG.key()).entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entrySet ->
+
entrySet.getValue().unwrapped().toString()));
+ }
+
servers =
ClickhouseUtil.createNodes(
config.getString(HOST.key()),
config.getString(DATABASE.key()),
config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
- config.getString(PASSWORD.key()));
+ config.getString(PASSWORD.key()),
+ customConfig);
sql = config.getString(SQL.key());
ClickHouseNode currentServer =
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
index 17e4fef1fd..4adea4b80c 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.api.table.factory.TableSourceFactory;
import com.google.auto.service.AutoService;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
@@ -39,7 +40,10 @@ public class ClickhouseSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
- return OptionRule.builder().required(HOST, DATABASE, SQL, USERNAME,
PASSWORD).build();
+ return OptionRule.builder()
+ .required(HOST, DATABASE, SQL, USERNAME, PASSWORD)
+ .optional(CLICKHOUSE_CONFIG)
+ .build();
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
index e8e491635f..f787cf5c8f 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import com.clickhouse.client.ClickHouseCredentials;
@@ -25,6 +26,7 @@ import com.clickhouse.client.ClickHouseProtocol;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
public class ClickhouseUtil {
@@ -34,30 +36,35 @@ public class ClickhouseUtil {
String database,
String serverTimeZone,
String username,
- String password) {
+ String password,
+ Map<String, String> options) {
return Arrays.stream(nodeAddress.split(","))
.map(
address -> {
String[] nodeAndPort = address.split(":", 2);
- if (StringUtils.isEmpty(username) &&
StringUtils.isEmpty(password)) {
- return ClickHouseNode.builder()
- .host(nodeAndPort[0])
- .port(
- ClickHouseProtocol.HTTP,
-
Integer.parseInt(nodeAndPort[1]))
- .database(database)
- .timeZone(serverTimeZone)
- .build();
+ ClickHouseNode.Builder builder =
+ ClickHouseNode.builder()
+ .host(nodeAndPort[0])
+ .port(
+ ClickHouseProtocol.HTTP,
+
Integer.parseInt(nodeAndPort[1]))
+ .database(database)
+ .timeZone(serverTimeZone);
+ if (MapUtils.isNotEmpty(options)) {
+ for (Map.Entry<String, String> entry :
options.entrySet()) {
+ builder =
builder.addOption(entry.getKey(), entry.getValue());
+ }
}
- return ClickHouseNode.builder()
- .host(nodeAndPort[0])
- .port(ClickHouseProtocol.HTTP,
Integer.parseInt(nodeAndPort[1]))
- .database(database)
- .timeZone(serverTimeZone)
- .credentials(
-
ClickHouseCredentials.fromUserAndPassword(
- username, password))
- .build();
+
+ if (StringUtils.isNotEmpty(username)
+ && StringUtils.isNotEmpty(password)) {
+ builder =
+ builder.credentials(
+
ClickHouseCredentials.fromUserAndPassword(
+ username, password));
+ }
+
+ return builder.build();
})
.collect(Collectors.toList());
}