This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3f1dcfc91 [Bug] [Connector-V2] Clickhouse File Connector not support 
split mode for write data to all shards of distributed table (#4035)
3f1dcfc91 is described below

commit 3f1dcfc9157647b98946171a034a01a60dc8fd0e
Author: sanyu <[email protected]>
AuthorDate: Fri Feb 3 14:28:24 2023 +0800

    [Bug] [Connector-V2] Clickhouse File Connector not support split mode for 
write data to all shards of distributed table (#4035)
---
 .../seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java      | 6 +++---
 .../seatunnel/clickhouse/sink/client/ClickhouseProxy.java           | 2 +-
 .../connectors/seatunnel/clickhouse/sink/client/ShardRouter.java    | 2 +-
 .../seatunnel/clickhouse/sink/file/ClickhouseFileSink.java          | 2 +-
 .../seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java    | 2 +-
 5 files changed, 7 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
index 2fe726325..bad4a752f 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
@@ -19,9 +19,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard;
 
 import com.clickhouse.client.ClickHouseCredentials;
 import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseProtocol;
 
 import java.io.Serializable;
-import java.net.InetSocketAddress;
 import java.util.Objects;
 
 public class Shard implements Serializable {
@@ -46,8 +46,8 @@ public class Shard implements Serializable {
                  String password) {
         this.shardNum = shardNum;
         this.replicaNum = replicaNum;
-        this.node = 
ClickHouseNode.builder().host(hostname).address(InetSocketAddress.createUnresolved(hostAddress,
-                
port)).database(database).weight(shardWeight).credentials(ClickHouseCredentials.fromUserAndPassword(username,
 password)).build();
+        this.node = 
ClickHouseNode.builder().host(hostname).port(ClickHouseProtocol.HTTP,
+            
port).database(database).weight(shardWeight).credentials(ClickHouseCredentials.fromUserAndPassword(username,
 password)).build();
     }
 
     public Shard(int shardNum, int replicaNum, ClickHouseNode node) {
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
index da4d401aa..99080f627 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
@@ -141,7 +141,7 @@ public class ClickhouseProxy {
      */
     public List<Shard> getClusterShardList(ClickHouseRequest<?> connection, 
String clusterName,
                                            String database, int port, String 
username, String password) {
-        String sql = "select 
shard_num,shard_weight,replica_num,host_name,host_address,port from 
system.clusters where cluster = '" + clusterName + "'";
+        String sql = "select 
shard_num,shard_weight,replica_num,host_name,host_address,port from 
system.clusters where cluster = '" + clusterName + "'" + " and replica_num=1";
         List<Shard> shardList = new ArrayList<>();
         try (ClickHouseResponse response = 
connection.query(sql).executeAndWait()) {
             response.records().forEach(r -> {
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
index 3e167e37b..d3078dc41 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -98,7 +98,7 @@ public class ShardRouter implements Serializable {
             return shards.firstEntry().getValue();
         }
         if (StringUtils.isEmpty(shardKey) || shardValue == null) {
-            return 
shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount + 1)).getValue();
+            return 
shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount) + 1).getValue();
         }
         int offset = (int) 
(HASH_INSTANCE.hash(ByteBuffer.wrap(shardValue.toString().getBytes(StandardCharsets.UTF_8)),
             0) & Long.MAX_VALUE % shardWeightCount);
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 192ad6815..0d67dce65 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -116,7 +116,7 @@ public class ClickhouseFileSink implements 
SeaTunnelSink<SeaTunnelRow, Clickhous
             config.getString(DATABASE.key()),
             config.getString(TABLE.key()),
             table.getEngine(),
-            false, // we don't need to set splitMode in clickhouse file mode.
+            true,
             new Shard(1, 1, nodes.get(0)), config.getString(USERNAME.key()), 
config.getString(PASSWORD.key()));
         List<String> fields = new ArrayList<>(tableSchema.keySet());
         Map<String, String> nodeUser = 
config.getObjectList(NODE_PASS.key()).stream()
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index 14b5c181c..c14ccd2dc 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -258,7 +258,7 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKFile
     }
 
     private void moveClickhouseLocalFileToServer(Shard shard, List<String> 
clickhouseLocalFiles) {
-        String hostAddress = shard.getNode().getAddress().getHostName();
+        String hostAddress = shard.getNode().getHost();
         String user = readerOption.getNodeUser().getOrDefault(hostAddress, 
"root");
         String password = 
readerOption.getNodePassword().getOrDefault(hostAddress, null);
         FileTransfer fileTransfer = 
FileTransferFactory.createFileTransfer(this.readerOption.getCopyMethod(), 
hostAddress, user, password);

Reply via email to