This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 309b58d12 [Feature][Connector-V2][Clickhouse] Add clickhouse connector 
time zone key,default system time zone (#5078)
309b58d12 is described below

commit 309b58d12d30f6ec7d523c2b9afec9181d3c4c6e
Author: 阿丙 <[email protected]>
AuthorDate: Fri Jul 14 11:08:04 2023 +0800

    [Feature][Connector-V2][Clickhouse] Add clickhouse connector time zone 
key,default system time zone (#5078)
    
    * Add clickhouse connector time zone key,default system time zone
    
    * Modify the document and add clickhouse server_time_zone configuration
---
 docs/en/connector-v2/source/Clickhouse.md          | 22 ++++++++++++++--------
 .../clickhouse/config/ClickhouseConfig.java        | 10 ++++++++++
 .../clickhouse/sink/client/ClickhouseSink.java     |  4 ++++
 .../clickhouse/sink/file/ClickhouseFileSink.java   |  2 ++
 .../clickhouse/source/ClickhouseSource.java        | 11 +++++++++++
 .../seatunnel/clickhouse/util/ClickhouseUtil.java  |  8 +++++++-
 6 files changed, 48 insertions(+), 9 deletions(-)

diff --git a/docs/en/connector-v2/source/Clickhouse.md 
b/docs/en/connector-v2/source/Clickhouse.md
index ef5d99c05..07384875c 100644
--- a/docs/en/connector-v2/source/Clickhouse.md
+++ b/docs/en/connector-v2/source/Clickhouse.md
@@ -20,14 +20,15 @@ supports query SQL and can achieve projection effect.
 
 ## Options
 
-|      name      |  type  | required | default value |
-|----------------|--------|----------|---------------|
-| host           | string | yes      | -             |
-| database       | string | yes      | -             |
-| sql            | string | yes      | -             |
-| username       | string | yes      | -             |
-| password       | string | yes      | -             |
-| common-options |        | no       | -             |
+|       name       |  type  | required |     default value      |
+|------------------|--------|----------|------------------------|
+| host             | string | yes      | -                      |
+| database         | string | yes      | -                      |
+| sql              | string | yes      | -                      |
+| username         | string | yes      | -                      |
+| password         | string | yes      | -                      |
+| server_time_zone | string | no       | ZoneId.systemDefault() |
+| common-options   |        | no       | -                      |
 
 ### host [string]
 
@@ -49,6 +50,10 @@ The query sql used to search data though Clickhouse server
 
 `ClickHouse` user password
 
+### server_time_zone [string]
+
+The session time zone in database server. If not set, then 
ZoneId.systemDefault() is used to determine the server time zone.
+
 ### common options
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details
@@ -64,6 +69,7 @@ source {
     sql = "select * from test where age = 20 limit 100"
     username = "default"
     password = ""
+    server_time_zone = "UTC"
     result_table_name = "test"
   }
   
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index 3d4b19edb..f7c8e032c 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
+import java.time.ZoneId;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -75,6 +76,15 @@ public class ClickhouseConfig {
                     .noDefaultValue()
                     .withDescription("Clickhouse server password");
 
+    /** Clickhouse server timezone */
+    public static final Option<String> SERVER_TIME_ZONE =
+            Options.key("server_time_zone")
+                    .stringType()
+                    .defaultValue(ZoneId.systemDefault().getId())
+                    .withDescription(
+                            "The session time zone in database server."
+                                    + "If not set, then ZoneId.systemDefault() 
is used to determine the server time zone");
+
     /** Split mode when table is distributed engine */
     public static final Option<Boolean> SPLIT_MODE =
             Options.key("split_mode")
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 42a927335..360c59259 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -61,6 +61,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PRIMARY_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SPLIT_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SUPPORT_UPSERT;
@@ -101,6 +102,7 @@ public class ClickhouseSink
                 ImmutableMap.<String, Object>builder()
                         .put(BULK_SIZE.key(), BULK_SIZE.defaultValue())
                         .put(SPLIT_MODE.key(), SPLIT_MODE.defaultValue())
+                        .put(SERVER_TIME_ZONE.key(), 
SERVER_TIME_ZONE.defaultValue())
                         .build();
 
         config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
@@ -111,6 +113,7 @@ public class ClickhouseSink
                     ClickhouseUtil.createNodes(
                             config.getString(HOST.key()),
                             config.getString(DATABASE.key()),
+                            config.getString(SERVER_TIME_ZONE.key()),
                             null,
                             null);
         } else {
@@ -118,6 +121,7 @@ public class ClickhouseSink
                     ClickhouseUtil.createNodes(
                             config.getString(HOST.key()),
                             config.getString(DATABASE.key()),
+                            config.getString(SERVER_TIME_ZONE.key()),
                             config.getString(USERNAME.key()),
                             config.getString(PASSWORD.key()));
         }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 26846b585..762815ee0 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -66,6 +66,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SHARDING_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.TABLE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
@@ -114,6 +115,7 @@ public class ClickhouseFileSink
                 ClickhouseUtil.createNodes(
                         config.getString(HOST.key()),
                         config.getString(DATABASE.key()),
+                        config.getString(SERVER_TIME_ZONE.key()),
                         config.getString(USERNAME.key()),
                         config.getString(PASSWORD.key()));
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index 5b902f060..e49b03091 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
@@ -44,13 +45,16 @@ import com.clickhouse.client.ClickHouseFormat;
 import com.clickhouse.client.ClickHouseNode;
 import com.clickhouse.client.ClickHouseResponse;
 import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SERVER_TIME_ZONE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.SQL;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
@@ -86,10 +90,17 @@ public class ClickhouseSource
                             "PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
+        Map<String, Object> defaultConfig =
+                ImmutableMap.<String, Object>builder()
+                        .put(SERVER_TIME_ZONE.key(), 
SERVER_TIME_ZONE.defaultValue())
+                        .build();
+
+        config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
         servers =
                 ClickhouseUtil.createNodes(
                         config.getString(HOST.key()),
                         config.getString(DATABASE.key()),
+                        config.getString(SERVER_TIME_ZONE.key()),
                         config.getString(USERNAME.key()),
                         config.getString(PASSWORD.key()));
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
index e70567a11..e8e491635 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseUtil.java
@@ -30,7 +30,11 @@ import java.util.stream.Collectors;
 public class ClickhouseUtil {
 
     public static List<ClickHouseNode> createNodes(
-            String nodeAddress, String database, String username, String 
password) {
+            String nodeAddress,
+            String database,
+            String serverTimeZone,
+            String username,
+            String password) {
         return Arrays.stream(nodeAddress.split(","))
                 .map(
                         address -> {
@@ -42,12 +46,14 @@ public class ClickhouseUtil {
                                                 ClickHouseProtocol.HTTP,
                                                 
Integer.parseInt(nodeAndPort[1]))
                                         .database(database)
+                                        .timeZone(serverTimeZone)
                                         .build();
                             }
                             return ClickHouseNode.builder()
                                     .host(nodeAndPort[0])
                                     .port(ClickHouseProtocol.HTTP, 
Integer.parseInt(nodeAndPort[1]))
                                     .database(database)
+                                    .timeZone(serverTimeZone)
                                     .credentials(
                                             
ClickHouseCredentials.fromUserAndPassword(
                                                     username, password))

Reply via email to