This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 757641bada [Imporve][ClickhouseFile] Directly connect to each shard 
node to obtain the corresponding path (#8449)
757641bada is described below

commit 757641bada16cf59400723d0720124abee3d1e26
Author: Cancai Cai <[email protected]>
AuthorDate: Mon Jan 6 10:32:09 2025 +0800

    [Imporve][ClickhouseFile] Directly connect to each shard node to obtain the 
corresponding path (#8449)
---
 .../connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java  | 5 ++++-
 .../seatunnel/clickhouse/sink/file/ClickhouseFileSink.java           | 4 +++-
 .../clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java         | 1 +
 .../seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java     | 5 +++++
 .../connectors/seatunnel/clickhouse/util/ClickhouseProxy.java        | 3 ++-
 5 files changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 644b078b5b..ff153a4c34 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -103,7 +103,10 @@ public class ClickhouseSink
         String shardKey = null;
         String shardKeyType = null;
         ClickhouseTable table =
-                proxy.getClickhouseTable(readonlyConfig.get(DATABASE), 
readonlyConfig.get(TABLE));
+                proxy.getClickhouseTable(
+                        proxy.getClickhouseConnection(),
+                        readonlyConfig.get(DATABASE),
+                        readonlyConfig.get(TABLE));
         if (readonlyConfig.get(SPLIT_MODE)) {
             if (!"Distributed".equals(table.getEngine())) {
                 throw new ClickhouseConnectorException(
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index e1c4afd6fd..f51f1d4a97 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -127,7 +127,9 @@ public class ClickhouseFileSink
                 proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
         ClickhouseTable table =
                 proxy.getClickhouseTable(
-                        config.getString(DATABASE.key()), 
config.getString(TABLE.key()));
+                        proxy.getClickhouseConnection(),
+                        config.getString(DATABASE.key()),
+                        config.getString(TABLE.key()));
         String shardKey = null;
         String shardKeyType = null;
         if (config.hasPath(SHARDING_KEY.key())) {
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
index def47b659d..28c6d250d7 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
@@ -50,6 +50,7 @@ public class ClickhouseFileSinkAggCommitter
         proxy = new 
ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode());
         clickhouseTable =
                 proxy.getClickhouseTable(
+                        proxy.getClickhouseConnection(),
                         readerOption.getShardMetadata().getDatabase(),
                         readerOption.getShardMetadata().getTable());
     }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index 984fe12127..dd29824aba 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -33,6 +33,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy
 
 import org.apache.commons.io.FileUtils;
 
+import com.clickhouse.client.ClickHouseRequest;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.BufferedReader;
@@ -91,6 +92,7 @@ public class ClickhouseFileSinkWriter
         shardRouter = new ShardRouter(proxy, 
this.readerOption.getShardMetadata());
         clickhouseTable =
                 proxy.getClickhouseTable(
+                        proxy.getClickhouseConnection(),
                         this.readerOption.getShardMetadata().getDatabase(),
                         this.readerOption.getShardMetadata().getTable());
         rowCache = new HashMap<>(Common.COLLECTION_SIZE);
@@ -105,8 +107,11 @@ public class ClickhouseFileSinkWriter
                                 Collectors.toMap(
                                         Function.identity(),
                                         shard -> {
+                                            ClickHouseRequest<?> request =
+                                                    
proxy.getClickhouseConnection(shard);
                                             ClickhouseTable shardTable =
                                                     proxy.getClickhouseTable(
+                                                            request,
                                                             
shard.getNode().getDatabase().get(),
                                                             
clickhouseTable.getLocalTableName());
                                             return shardTable.getDataPaths();
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
index 4074760c23..275e16791b 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
@@ -233,7 +233,8 @@ public class ClickhouseProxy {
      * @param table table name of the table.
      * @return clickhouse table info.
      */
-    public ClickhouseTable getClickhouseTable(String database, String table) {
+    public ClickhouseTable getClickhouseTable(
+            ClickHouseRequest<?> clickhouseRequest, String database, String 
table) {
         String sql =
                 String.format(
                         "select 
engine,create_table_query,engine_full,data_paths,sorting_key from system.tables 
where database = '%s' and name = '%s'",

Reply via email to