This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5560e3dab2 [hotfix] fix exceptions caused by operator priority in 
connector-clickhouse when using sharding_key (#8162)
5560e3dab2 is described below

commit 5560e3dab2cab66de881ec7aa469dc094af708d0
Author: Nova <[email protected]>
AuthorDate: Tue Jan 7 18:56:03 2025 +0800

    [hotfix] fix exceptions caused by operator priority in connector-clickhouse 
when using sharding_key (#8162)
---
 .../clickhouse/sink/client/ShardRouter.java        |  15 +--
 .../seatunnel/clickhouse/ShardRouterTest.java      | 116 +++++++++++++++++++++
 2 files changed, 124 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
index af115d300e..9ecfe36b8b 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -113,13 +113,14 @@ public class ShardRouter implements Serializable {
         }
         int offset =
                 (int)
-                        (HASH_INSTANCE.hash(
-                                        ByteBuffer.wrap(
-                                                shardValue
-                                                        .toString()
-                                                        
.getBytes(StandardCharsets.UTF_8)),
-                                        0)
-                                & Long.MAX_VALUE % shardWeightCount);
+                        ((HASH_INSTANCE.hash(
+                                                ByteBuffer.wrap(
+                                                        shardValue
+                                                                .toString()
+                                                                
.getBytes(StandardCharsets.UTF_8)),
+                                                0)
+                                        & Long.MAX_VALUE)
+                                % shardWeightCount);
         return shards.lowerEntry(offset + 1).getValue();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ShardRouterTest.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ShardRouterTest.java
new file mode 100644
index 0000000000..37f4081ec0
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ShardRouterTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse;
+
+import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.DistributedEngine;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ShardRouterTest {
+
+    @Test
+    public void testWithShardRouterGetShardRight() {
+        String clusterName = "default";
+        String database = "test_db";
+        String localTable = "test_table_local";
+        String localTableEngine = "ReplicatedMergeTree";
+        String localTableDDL =
+                "create table test_db.test_table_local (token String) ENGINE = 
ReplicatedMergeTree()";
+        String username = "test";
+        String password = "123456";
+
+        // Assuming there are 28 clickhouse nodes with 2 replica
+        List<Shard> shardList = new ArrayList<>();
+        Set<Integer> expected = new TreeSet<>();
+        for (int i = 1; i <= 14; i++) {
+            expected.add(i);
+            Shard shard =
+                    new Shard(
+                            i,
+                            1,
+                            1,
+                            "shard" + i,
+                            "shard" + i,
+                            9000,
+                            database,
+                            username,
+                            password,
+                            Collections.emptyMap());
+            shardList.add(shard);
+        }
+
+        DistributedEngine distributedEngine =
+                new DistributedEngine(
+                        clusterName, database, localTable, localTableEngine, 
localTableDDL);
+        ClickhouseProxy proxy = Mockito.mock(ClickhouseProxy.class);
+        
Mockito.when(proxy.getClickhouseConnection(Mockito.any(Shard.class))).thenReturn(null);
+        Mockito.when(
+                        proxy.getClickhouseDistributedTable(
+                                Mockito.eq(null), Mockito.anyString(), 
Mockito.anyString()))
+                .thenReturn(distributedEngine);
+        Mockito.when(
+                        proxy.getClusterShardList(
+                                Mockito.eq(null),
+                                Mockito.eq("default"),
+                                Mockito.eq("test_db"),
+                                Mockito.eq(9000),
+                                Mockito.eq(null),
+                                Mockito.eq(null),
+                                Mockito.eq(Collections.emptyMap())))
+                .thenReturn(shardList);
+
+        String shardKey = "token";
+        String shardKeyType = "String";
+        ShardMetadata shardMetadata =
+                new ShardMetadata(
+                        shardKey,
+                        shardKeyType,
+                        shardKey,
+                        database,
+                        localTable,
+                        localTableEngine,
+                        true,
+                        shardList.get(0));
+
+        Set<Integer> actual = new TreeSet<>();
+        ShardRouter shardRouter = new ShardRouter(proxy, shardMetadata);
+        for (int i = 0; i < 10000000; i++) {
+            byte[] randomBytes = new byte[16];
+            ThreadLocalRandom.current().nextBytes(randomBytes);
+            Shard shard = shardRouter.getShard(Arrays.toString(randomBytes));
+            int shardNum = shard.getShardNum();
+            actual.add(shardNum);
+        }
+
+        Assertions.assertEquals(expected, actual);
+    }
+}

Reply via email to