This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 757641bada [Imporve][ClickhouseFile] Directly connect to each shard
node to obtain the corresponding path (#8449)
757641bada is described below
commit 757641bada16cf59400723d0720124abee3d1e26
Author: Cancai Cai <[email protected]>
AuthorDate: Mon Jan 6 10:32:09 2025 +0800
[Imporve][ClickhouseFile] Directly connect to each shard node to obtain the
corresponding path (#8449)
---
.../connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java | 5 ++++-
.../seatunnel/clickhouse/sink/file/ClickhouseFileSink.java | 4 +++-
.../clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java | 1 +
.../seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java | 5 +++++
.../connectors/seatunnel/clickhouse/util/ClickhouseProxy.java | 3 ++-
5 files changed, 15 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 644b078b5b..ff153a4c34 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -103,7 +103,10 @@ public class ClickhouseSink
String shardKey = null;
String shardKeyType = null;
ClickhouseTable table =
- proxy.getClickhouseTable(readonlyConfig.get(DATABASE),
readonlyConfig.get(TABLE));
+ proxy.getClickhouseTable(
+ proxy.getClickhouseConnection(),
+ readonlyConfig.get(DATABASE),
+ readonlyConfig.get(TABLE));
if (readonlyConfig.get(SPLIT_MODE)) {
if (!"Distributed".equals(table.getEngine())) {
throw new ClickhouseConnectorException(
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index e1c4afd6fd..f51f1d4a97 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -127,7 +127,9 @@ public class ClickhouseFileSink
proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
ClickhouseTable table =
proxy.getClickhouseTable(
- config.getString(DATABASE.key()),
config.getString(TABLE.key()));
+ proxy.getClickhouseConnection(),
+ config.getString(DATABASE.key()),
+ config.getString(TABLE.key()));
String shardKey = null;
String shardKeyType = null;
if (config.hasPath(SHARDING_KEY.key())) {
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
index def47b659d..28c6d250d7 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
@@ -50,6 +50,7 @@ public class ClickhouseFileSinkAggCommitter
proxy = new
ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode());
clickhouseTable =
proxy.getClickhouseTable(
+ proxy.getClickhouseConnection(),
readerOption.getShardMetadata().getDatabase(),
readerOption.getShardMetadata().getTable());
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index 984fe12127..dd29824aba 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -33,6 +33,7 @@ import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy
import org.apache.commons.io.FileUtils;
+import com.clickhouse.client.ClickHouseRequest;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
@@ -91,6 +92,7 @@ public class ClickhouseFileSinkWriter
shardRouter = new ShardRouter(proxy,
this.readerOption.getShardMetadata());
clickhouseTable =
proxy.getClickhouseTable(
+ proxy.getClickhouseConnection(),
this.readerOption.getShardMetadata().getDatabase(),
this.readerOption.getShardMetadata().getTable());
rowCache = new HashMap<>(Common.COLLECTION_SIZE);
@@ -105,8 +107,11 @@ public class ClickhouseFileSinkWriter
Collectors.toMap(
Function.identity(),
shard -> {
+ ClickHouseRequest<?> request =
+
proxy.getClickhouseConnection(shard);
ClickhouseTable shardTable =
proxy.getClickhouseTable(
+ request,
shard.getNode().getDatabase().get(),
clickhouseTable.getLocalTableName());
return shardTable.getDataPaths();
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
index 4074760c23..275e16791b 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java
@@ -233,7 +233,8 @@ public class ClickhouseProxy {
* @param table table name of the table.
* @return clickhouse table info.
*/
- public ClickhouseTable getClickhouseTable(String database, String table) {
+ public ClickhouseTable getClickhouseTable(
+ ClickHouseRequest<?> clickhouseRequest, String database, String
table) {
String sql =
String.format(
"select
engine,create_table_query,engine_full,data_paths,sorting_key from system.tables
where database = '%s' and name = '%s'",