This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5560e3dab2 [hotfix] fix exceptions caused by operator priority in
connector-clickhouse when using sharding_key (#8162)
5560e3dab2 is described below
commit 5560e3dab2cab66de881ec7aa469dc094af708d0
Author: Nova <[email protected]>
AuthorDate: Tue Jan 7 18:56:03 2025 +0800
[hotfix] fix exceptions caused by operator priority in connector-clickhouse
when using sharding_key (#8162)
---
.../clickhouse/sink/client/ShardRouter.java | 15 +--
.../seatunnel/clickhouse/ShardRouterTest.java | 116 +++++++++++++++++++++
2 files changed, 124 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
index af115d300e..9ecfe36b8b 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -113,13 +113,14 @@ public class ShardRouter implements Serializable {
}
int offset =
(int)
- (HASH_INSTANCE.hash(
- ByteBuffer.wrap(
- shardValue
- .toString()
-
.getBytes(StandardCharsets.UTF_8)),
- 0)
- & Long.MAX_VALUE % shardWeightCount);
+ ((HASH_INSTANCE.hash(
+ ByteBuffer.wrap(
+ shardValue
+ .toString()
+
.getBytes(StandardCharsets.UTF_8)),
+ 0)
+ & Long.MAX_VALUE)
+ % shardWeightCount);
return shards.lowerEntry(offset + 1).getValue();
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ShardRouterTest.java
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ShardRouterTest.java
new file mode 100644
index 0000000000..37f4081ec0
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ShardRouterTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse;
+
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.DistributedEngine;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ShardRouterTest {
+
+ @Test
+ public void testWithShardRouterGetShardRight() {
+ String clusterName = "default";
+ String database = "test_db";
+ String localTable = "test_table_local";
+ String localTableEngine = "ReplicatedMergeTree";
+ String localTableDDL =
+ "create table test_db.test_table_local (token String) ENGINE =
ReplicatedMergeTree()";
+ String username = "test";
+ String password = "123456";
+
+ // Assuming there are 28 clickhouse nodes with 2 replica
+ List<Shard> shardList = new ArrayList<>();
+ Set<Integer> expected = new TreeSet<>();
+ for (int i = 1; i <= 14; i++) {
+ expected.add(i);
+ Shard shard =
+ new Shard(
+ i,
+ 1,
+ 1,
+ "shard" + i,
+ "shard" + i,
+ 9000,
+ database,
+ username,
+ password,
+ Collections.emptyMap());
+ shardList.add(shard);
+ }
+
+ DistributedEngine distributedEngine =
+ new DistributedEngine(
+ clusterName, database, localTable, localTableEngine,
localTableDDL);
+ ClickhouseProxy proxy = Mockito.mock(ClickhouseProxy.class);
+
Mockito.when(proxy.getClickhouseConnection(Mockito.any(Shard.class))).thenReturn(null);
+ Mockito.when(
+ proxy.getClickhouseDistributedTable(
+ Mockito.eq(null), Mockito.anyString(),
Mockito.anyString()))
+ .thenReturn(distributedEngine);
+ Mockito.when(
+ proxy.getClusterShardList(
+ Mockito.eq(null),
+ Mockito.eq("default"),
+ Mockito.eq("test_db"),
+ Mockito.eq(9000),
+ Mockito.eq(null),
+ Mockito.eq(null),
+ Mockito.eq(Collections.emptyMap())))
+ .thenReturn(shardList);
+
+ String shardKey = "token";
+ String shardKeyType = "String";
+ ShardMetadata shardMetadata =
+ new ShardMetadata(
+ shardKey,
+ shardKeyType,
+ shardKey,
+ database,
+ localTable,
+ localTableEngine,
+ true,
+ shardList.get(0));
+
+ Set<Integer> actual = new TreeSet<>();
+ ShardRouter shardRouter = new ShardRouter(proxy, shardMetadata);
+ for (int i = 0; i < 10000000; i++) {
+ byte[] randomBytes = new byte[16];
+ ThreadLocalRandom.current().nextBytes(randomBytes);
+ Shard shard = shardRouter.getShard(Arrays.toString(randomBytes));
+ int shardNum = shard.getShardNum();
+ actual.add(shardNum);
+ }
+
+ Assertions.assertEquals(expected, actual);
+ }
+}