This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b6d075194b [bugFix][Connector-V2][Doris] The multi-FE configuration is 
supported (#6341)
b6d075194b is described below

commit b6d075194bfe79154afb19128964201addd6e2a0
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Mon May 13 21:06:10 2024 +0800

    [bugFix][Connector-V2][Doris] The multi-FE configuration is supported 
(#6341)
---
 .../connectors/doris/rest/RestService.java         |  3 +-
 .../doris/sink/writer/DorisSinkWriter.java         |  3 +-
 .../doris/rest/models/RestServiceTest.java         | 56 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
index 315f36cfa2..b516157443 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
@@ -237,7 +237,8 @@ public class RestService implements Serializable {
     }
 
     @VisibleForTesting
-    static String randomEndpoint(String feNodes, Logger logger) throws 
DorisConnectorException {
+    public static String randomEndpoint(String feNodes, Logger logger)
+            throws DorisConnectorException {
         logger.trace("Parse fenodes '{}'.", feNodes);
         if (StringUtils.isEmpty(feNodes)) {
             String errMsg =
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 496b91b25b..b5aa527421 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.doris.config.DorisConfig;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
+import org.apache.seatunnel.connectors.doris.rest.RestService;
 import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
 import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer;
 import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
@@ -96,7 +97,7 @@ public class DorisSinkWriter
     }
 
     private void initializeLoad() {
-        String backend = dorisConfig.getFrontends();
+        String backend = 
RestService.randomEndpoint(dorisConfig.getFrontends(), log);
         try {
             this.dorisStreamLoad =
                     new DorisStreamLoad(
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java
 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java
new file mode 100644
index 0000000000..aa917d5766
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.rest.models;
+
+import org.apache.seatunnel.connectors.doris.rest.RestService;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class RestServiceTest {
+
+    @Test
+    void testRandomEndpoint() {
+
+        List<String> list =
+                Arrays.asList(
+                        "fe_host1:fe_http_port1",
+                        "fe_host2:fe_http_port2",
+                        "fe_host3:fe_http_port3",
+                        "fe_host4:fe_http_port4",
+                        "fe_host5:fe_http_port5");
+
+        boolean hasDifferentAddress = false;
+        for (int i = 0; i < 5; i++) {
+            Set<String> addresses =
+                    list.stream()
+                            .map(address -> 
RestService.randomEndpoint(String.join(",", list), log))
+                            .collect(Collectors.toSet());
+            hasDifferentAddress = addresses.size() > 1;
+        }
+        Assertions.assertTrue(hasDifferentAddress);
+    }
+}

Reply via email to