This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 309b58d12 [Feature][Connector-V2][Clickhouse] Add clickhouse connector
time zone key,default system time zone (#5078)
309b58d12 is described below
commit 309b58d12d30f6ec7d523c2b9afec9181d3c4c6e
Author: 阿丙 <[email protected]>
AuthorDate: Fri Jul 14 11:08:04 2023 +0800
[Feature][Connector-V2][Clickhouse] Add clickhouse connector time zone
key,default system time zone (#5078)
* Add clickhouse connector time zone key,default system time zone
* Modify the document and add clickhouse server_time_zone configuration
---
docs/en/connector-v2/source/Clickhouse.md | 22 ++++++++++++++--------
.../clickhouse/config/ClickhouseConfig.java | 10 ++++++++++
.../clickhouse/sink/client/ClickhouseSink.java | 4 ++++
.../clickhouse/sink/file/ClickhouseFileSink.java | 2 ++
.../clickhouse/source/ClickhouseSource.java | 11 +++++++++++
.../seatunnel/clickhouse/util/ClickhouseUtil.java | 8 +++++++-
6 files changed, 48 insertions(+), 9 deletions(-)
diff --git a/docs/en/connector-v2/source/Clickhouse.md
b/docs/en/connector-v2/source/Clickhouse.md
index ef5d99c05..07384875c 100644
--- a/docs/en/connector-v2/source/Clickhouse.md
+++ b/docs/en/connector-v2/source/Clickhouse.md
@@ -20,14 +20,15 @@ supports query SQL and can achieve projection effect.
## Options
-| name | type | required | default value |
-|----------------|--------|----------|---------------|
-| host | string | yes | - |
-| database | string | yes | - |
-| sql | string | yes | - |
-| username | string | yes | - |
-| password | string | yes | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|------------------|--------|----------|------------------------|
+| host | string | yes | - |
+| database | string | yes | - |
+| sql | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| server_time_zone | string | no | ZoneId.systemDefault() |
+| common-options | | no | - |
### host [string]
@@ -49,6 +50,10 @@ The query sql used to search data though Clickhouse server
`ClickHouse` user password
+### server_time_zone [string]
+
+The session time zone in database server. If not set, then
ZoneId.systemDefault() is used to determine the server time zone.
+
### common options
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
@@ -64,6 +69,7 @@ source {
sql = "select * from test where age = 20 limit 100"
username = "default"
password = ""
+ server_time_zone = "UTC"
result_table_name = "test"
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index 3d4b19edb..f7c8e032c 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
+import java.time.ZoneId;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -75,6 +76,15 @@ public class ClickhouseConfig {
.noDefaultValue()
.withDescription("Clickhouse server password");
+ /** Clickhouse server timezone */
+ public static final Option<String> SERVER_TIME_ZONE =
+ Options.key("server_time_zone")
+ .stringType()
+ .defaultValue(ZoneId.systemDefault().getId())
+ .withDescription(
+ "The session time zone in database server."
+ + "If not set, then ZoneId.systemDefault()
is used to determine the server time zone");
+
/** Split mode when table is distributed engine */
public static final Option<Boolean> SPLIT_MODE =
Options.key("split_mode")
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 42a927335..360c59259 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -61,6 +61,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
@@ -101,6 +102,7 @@ public class ClickhouseSink
ImmutableMap.<String, Object>builder()
.put(BULK_SIZE.key(), BULK_SIZE.defaultValue())
.put(SPLIT_MODE.key(), SPLIT_MODE.defaultValue())
+ .put(SERVER_TIME_ZONE.key(),
SERVER_TIME_ZONE.defaultValue())
.build();
config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
@@ -111,6 +113,7 @@ public class ClickhouseSink
ClickhouseUtil.createNodes(
config.getString(HOST.key()),
config.getString(DATABASE.key()),
+ config.getString(SERVER_TIME_ZONE.key()),
null,
null);
} else {
@@ -118,6 +121,7 @@ public class ClickhouseSink
ClickhouseUtil.createNodes(
config.getString(HOST.key()),
config.getString(DATABASE.key()),
+ config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 26846b585..762815ee0 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -66,6 +66,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
@@ -114,6 +115,7 @@ public class ClickhouseFileSink
ClickhouseUtil.createNodes(
config.getString(HOST.key()),
config.getString(DATABASE.key()),
+ config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index 5b902f060..e49b03091 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
@@ -44,13 +45,16 @@ import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseResponse;
import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
@@ -86,10 +90,17 @@ public class ClickhouseSource
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE,
result.getMsg()));
}
+ Map<String, Object> defaultConfig =
+ ImmutableMap.<String, Object>builder()
+ .put(SERVER_TIME_ZONE.key(),
SERVER_TIME_ZONE.defaultValue())
+ .build();
+
+ config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
servers =
ClickhouseUtil.createNodes(
config.getString(HOST.key()),
config.getString(DATABASE.key()),
+ config.getString(SERVER_TIME_ZONE.key()),
config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
index e70567a11..e8e491635 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
@@ -30,7 +30,11 @@ import java.util.stream.Collectors;
public class ClickhouseUtil {
public static List<ClickHouseNode> createNodes(
- String nodeAddress, String database, String username, String
password) {
+ String nodeAddress,
+ String database,
+ String serverTimeZone,
+ String username,
+ String password) {
return Arrays.stream(nodeAddress.split(","))
.map(
address -> {
@@ -42,12 +46,14 @@ public class ClickhouseUtil {
ClickHouseProtocol.HTTP,
Integer.parseInt(nodeAndPort[1]))
.database(database)
+ .timeZone(serverTimeZone)
.build();
}
return ClickHouseNode.builder()
.host(nodeAndPort[0])
.port(ClickHouseProtocol.HTTP,
Integer.parseInt(nodeAndPort[1]))
.database(database)
+ .timeZone(serverTimeZone)
.credentials(
ClickHouseCredentials.fromUserAndPassword(
username, password))