This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3f1dcfc91 [Bug] [Connector-V2] Clickhouse File Connector not support
split mode for write data to all shards of distributed table (#4035)
3f1dcfc91 is described below
commit 3f1dcfc9157647b98946171a034a01a60dc8fd0e
Author: sanyu <[email protected]>
AuthorDate: Fri Feb 3 14:28:24 2023 +0800
[Bug] [Connector-V2] Clickhouse File Connector not support split mode for
write data to all shards of distributed table (#4035)
---
.../seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java | 6 +++---
.../seatunnel/clickhouse/sink/client/ClickhouseProxy.java | 2 +-
.../connectors/seatunnel/clickhouse/sink/client/ShardRouter.java | 2 +-
.../seatunnel/clickhouse/sink/file/ClickhouseFileSink.java | 2 +-
.../seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java | 2 +-
5 files changed, 7 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
index 2fe726325..bad4a752f 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/shard/Shard.java
@@ -19,9 +19,9 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard;
import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseNode;
+import com.clickhouse.client.ClickHouseProtocol;
import java.io.Serializable;
-import java.net.InetSocketAddress;
import java.util.Objects;
public class Shard implements Serializable {
@@ -46,8 +46,8 @@ public class Shard implements Serializable {
String password) {
this.shardNum = shardNum;
this.replicaNum = replicaNum;
- this.node =
ClickHouseNode.builder().host(hostname).address(InetSocketAddress.createUnresolved(hostAddress,
-
port)).database(database).weight(shardWeight).credentials(ClickHouseCredentials.fromUserAndPassword(username,
password)).build();
+ this.node =
ClickHouseNode.builder().host(hostname).port(ClickHouseProtocol.HTTP,
+
port).database(database).weight(shardWeight).credentials(ClickHouseCredentials.fromUserAndPassword(username,
password)).build();
}
public Shard(int shardNum, int replicaNum, ClickHouseNode node) {
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
index da4d401aa..99080f627 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
@@ -141,7 +141,7 @@ public class ClickhouseProxy {
*/
public List<Shard> getClusterShardList(ClickHouseRequest<?> connection,
String clusterName,
String database, int port, String
username, String password) {
- String sql = "select
shard_num,shard_weight,replica_num,host_name,host_address,port from
system.clusters where cluster = '" + clusterName + "'";
+ String sql = "select
shard_num,shard_weight,replica_num,host_name,host_address,port from
system.clusters where cluster = '" + clusterName + "'" + " and replica_num=1";
List<Shard> shardList = new ArrayList<>();
try (ClickHouseResponse response =
connection.query(sql).executeAndWait()) {
response.records().forEach(r -> {
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
index 3e167e37b..d3078dc41 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -98,7 +98,7 @@ public class ShardRouter implements Serializable {
return shards.firstEntry().getValue();
}
if (StringUtils.isEmpty(shardKey) || shardValue == null) {
- return
shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount + 1)).getValue();
+ return
shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount) + 1).getValue();
}
int offset = (int)
(HASH_INSTANCE.hash(ByteBuffer.wrap(shardValue.toString().getBytes(StandardCharsets.UTF_8)),
0) & Long.MAX_VALUE % shardWeightCount);
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index 192ad6815..0d67dce65 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -116,7 +116,7 @@ public class ClickhouseFileSink implements
SeaTunnelSink<SeaTunnelRow, Clickhous
config.getString(DATABASE.key()),
config.getString(TABLE.key()),
table.getEngine(),
- false, // we don't need to set splitMode in clickhouse file mode.
+ true,
new Shard(1, 1, nodes.get(0)), config.getString(USERNAME.key()),
config.getString(PASSWORD.key()));
List<String> fields = new ArrayList<>(tableSchema.keySet());
Map<String, String> nodeUser =
config.getObjectList(NODE_PASS.key()).stream()
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index 14b5c181c..c14ccd2dc 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -258,7 +258,7 @@ public class ClickhouseFileSinkWriter implements
SinkWriter<SeaTunnelRow, CKFile
}
private void moveClickhouseLocalFileToServer(Shard shard, List<String>
clickhouseLocalFiles) {
- String hostAddress = shard.getNode().getAddress().getHostName();
+ String hostAddress = shard.getNode().getHost();
String user = readerOption.getNodeUser().getOrDefault(hostAddress,
"root");
String password =
readerOption.getNodePassword().getOrDefault(hostAddress, null);
FileTransfer fileTransfer =
FileTransferFactory.createFileTransfer(this.readerOption.getCopyMethod(),
hostAddress, user, password);