This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d1b86e4  [Enhancemant] Add load balance strategy for frontends and 
backends (#329)
d1b86e4 is described below

commit d1b86e41172a6da1a101d56805328e0111241c25
Author: Lei <[email protected]>
AuthorDate: Thu Jul 24 11:09:14 2025 +0800

    [Enhancemant] Add load balance strategy for frontends and backends (#329)
---
 .../doris/spark/client/DorisBackendHttpClient.java |   7 +-
 .../doris/spark/client/DorisFrontendClient.java    |  26 ++---
 .../apache/doris/spark/client/entity/Frontend.java |  16 ++++
 .../spark/client/read/DorisFlightSqlReader.java    |   6 +-
 .../apache/doris/spark/util/LoadBalanceList.java   | 106 +++++++++++++++++++++
 .../doris/spark/util/LoadBalanceListTest.java      |  89 +++++++++++++++++
 6 files changed, 233 insertions(+), 17 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
index 1f28ae5..2df5cc1 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
@@ -18,6 +18,7 @@
 package org.apache.doris.spark.client;
 
 import org.apache.doris.spark.client.entity.Backend;
+import org.apache.doris.spark.util.LoadBalanceList;
 import org.apache.doris.spark.util.HttpUtil;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
@@ -32,12 +33,13 @@ import java.util.function.BiFunction;
 public class DorisBackendHttpClient implements Serializable {
 
     private static final Logger log = 
LoggerFactory.getLogger(DorisBackendHttpClient.class);
-    private final List<Backend> backends;
+
+    private final LoadBalanceList<Backend> backends;
 
     private transient CloseableHttpClient httpClient;
 
     public DorisBackendHttpClient(List<Backend> backends) {
-        this.backends = backends;
+        this.backends = new LoadBalanceList<>(backends);
     }
 
     public <T> T executeReq(BiFunction<Backend, CloseableHttpClient, T> 
reqFunc) throws Exception {
@@ -52,6 +54,7 @@ public class DorisBackendHttpClient implements Serializable {
                 }
             } catch (Exception e) {
                 log.warn("Failed to execute request on backend: {}:{}", 
backend.getHost(), backend.getHttpPort(), e);
+                backends.reportFailed(backend);
                 ex = e;
             }
         }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index b0d1dc1..b99902b 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -28,6 +28,7 @@ import org.apache.doris.spark.rest.models.QueryPlan;
 import org.apache.doris.spark.rest.models.Schema;
 import org.apache.doris.spark.util.HttpUtil;
 import org.apache.doris.spark.util.HttpUtils;
+import org.apache.doris.spark.util.LoadBalanceList;
 import org.apache.doris.spark.util.URLs;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -73,7 +74,7 @@ public class DorisFrontendClient implements Serializable {
     private final DorisConfig config;
     private final String username;
     private final String password;
-    private final List<Frontend> frontends;
+    private final LoadBalanceList<Frontend> frontends;
     private final boolean isHttpsEnabled;
     private transient CloseableHttpClient httpClient;
 
@@ -83,7 +84,7 @@ public class DorisFrontendClient implements Serializable {
         this.password = null;
         this.httpClient = null;
         this.isHttpsEnabled = false;
-        this.frontends = Collections.emptyList();
+        this.frontends = new LoadBalanceList<>(Collections.emptyList());
     }
 
     public DorisFrontendClient(DorisConfig config) throws Exception {
@@ -94,17 +95,18 @@ public class DorisFrontendClient implements Serializable {
         this.frontends = initFrontends(config);
     }
 
-    private List<Frontend> initFrontends(DorisConfig config) throws Exception {
+    private LoadBalanceList<Frontend> initFrontends(DorisConfig config) throws 
Exception {
         String frontendNodes = config.getValue(DorisOptions.DORIS_FENODES);
         String[] frontendNodeArray = frontendNodes.split(",");
+        List<Frontend> frontendList = null;
         if (config.getValue(DorisOptions.DORIS_FE_AUTO_FETCH)) {
             Exception ex = null;
-            List<Frontend> frontendList = null;
             for (String frontendNode : frontendNodeArray) {
                 String[] nodeDetails = frontendNode.split(":");
                 try {
-                    List<Frontend> list = Collections.singletonList(new 
Frontend(nodeDetails[0],
-                            nodeDetails.length > 1 ? 
Integer.parseInt(nodeDetails[1]) : -1));
+                    LoadBalanceList<Frontend> list = new LoadBalanceList<>(
+                        Collections.singletonList(new Frontend(nodeDetails[0],
+                            nodeDetails.length > 1 ? 
Integer.parseInt(nodeDetails[1]) : -1)));
                     frontendList = requestFrontends(list, (frontend, client) 
-> {
                         String url = URLs.getFrontEndNodes(frontend.getHost(), 
frontend.getHttpPort(),
                                 isHttpsEnabled);
@@ -132,18 +134,18 @@ public class DorisFrontendClient implements Serializable {
                 }
                 throw new DorisException("frontend init fetch failed", ex);
             }
-            return frontendList;
+            return new LoadBalanceList<>(frontendList);
         } else {
             int queryPort = config.contains(DorisOptions.DORIS_QUERY_PORT) ?
                     config.getValue(DorisOptions.DORIS_QUERY_PORT) : -1;
             int flightSqlPort = 
config.contains(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) ?
                     config.getValue(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) : 
-1;
-            return Arrays.stream(frontendNodeArray)
+            return new LoadBalanceList<>(Arrays.stream(frontendNodeArray)
                     .map(node -> {
                         String[] nodeParts = node.split(":");
                         return new Frontend(nodeParts[0], nodeParts.length > 1 
? Integer.parseInt(nodeParts[1]) : -1, queryPort, flightSqlPort);
                     })
-                    .collect(Collectors.toList());
+                    .collect(Collectors.toList()));
         }
     }
 
@@ -151,7 +153,7 @@ public class DorisFrontendClient implements Serializable {
         return requestFrontends(frontends, reqFunc);
     }
 
-    private <T> T requestFrontends(List<Frontend> frontEnds, 
BiFunction<Frontend, CloseableHttpClient, T> reqFunc) throws Exception {
+    private <T> T requestFrontends(LoadBalanceList<Frontend> frontEnds, 
BiFunction<Frontend, CloseableHttpClient, T> reqFunc) throws Exception {
         if (httpClient == null) {
             httpClient = HttpUtils.getHttpClient(config);
         }
@@ -163,6 +165,7 @@ public class DorisFrontendClient implements Serializable {
                 }
             } catch (Exception e) {
                 LOG.warn("fe http request on {} failed, err: {}", 
frontEnd.hostHttpPortString(), e.getMessage());
+                frontEnds.reportFailed(frontEnd);
                 ex = e;
             }
         }
@@ -370,6 +373,7 @@ public class DorisFrontendClient implements Serializable {
                     backends.add(new Backend(backendNode.get("ip").asText(), 
backendNode.get("http_port").asInt(), -1));
                 }
             }
+            Collections.shuffle(backends);
             return backends;
         });
     }
@@ -386,7 +390,7 @@ public class DorisFrontendClient implements Serializable {
         });
     }
 
-    public List<Frontend> getFrontends() {
+    public LoadBalanceList<Frontend> getFrontends() {
         return frontends;
     }
 
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Frontend.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Frontend.java
index a2858fb..8fd17a4 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Frontend.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/Frontend.java
@@ -18,6 +18,7 @@
 package org.apache.doris.spark.client.entity;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 public class Frontend implements Serializable {
 
@@ -66,4 +67,19 @@ public class Frontend implements Serializable {
         return host + ":" + queryPort;
     }
 
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) return true;
+               if (!(o instanceof Frontend)) return false;
+               Frontend frontend = (Frontend) o;
+               return httpPort == frontend.httpPort
+                               && queryPort == frontend.queryPort
+                               && flightSqlPort == frontend.flightSqlPort
+                               && Objects.equals(host, frontend.host);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(host, httpPort, queryPort, flightSqlPort);
+       }
 }
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index 55298ff..112b2f8 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -48,7 +48,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Collections;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -66,10 +65,8 @@ public class DorisFlightSqlReader extends DorisReader {
     public DorisFlightSqlReader(DorisReaderPartition partition) throws 
Exception {
         super(partition);
         this.frontendClient = new DorisFrontendClient(partition.getConfig());
-        List<Frontend> frontends = new 
ArrayList<>(frontendClient.getFrontends());
-        Collections.shuffle(frontends);
         Exception tx = null;
-        for (Frontend frontend : frontends) {
+        for (Frontend frontend : frontendClient.getFrontends()) {
             try {
                 this.connection = initializeConnection(frontend, 
partition.getConfig());
                 tx = null;
@@ -77,6 +74,7 @@ public class DorisFlightSqlReader extends DorisReader {
             } catch (OptionRequiredException e) {
                 throw new DorisException("init adbc connection failed", e);
             } catch (AdbcException e) {
+                frontendClient.getFrontends().reportFailed(frontend);
                 log.warn("init adbc connection failed with fe: " + 
frontend.getHost(), e);
                 tx = new DorisException("init adbc connection failed", e);
             }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/LoadBalanceList.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/LoadBalanceList.java
new file mode 100644
index 0000000..ff76d46
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/util/LoadBalanceList.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+package org.apache.doris.spark.util;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.List;
+import java.util.Iterator;
+import java.util.Queue;
+import java.util.PriorityQueue;
+import java.util.Collections;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class LoadBalanceList<T> implements Iterable<T>, Serializable {
+
+       private final List<T> list;
+
+       private final Map<T, FailedServer<T>> failedServers;
+
+       private final AtomicInteger globalOffset = new AtomicInteger(0);
+
+       private static final long FAILED_TIME_OUT = 60 * 60 * 1000;
+
+       public LoadBalanceList(List<T> servers) {
+               this.list = Collections.unmodifiableList(servers);
+               this.failedServers = new ConcurrentHashMap<>();
+       }
+
+       @Override
+       public Iterator<T> iterator() {
+               return new Iterator<T>() {
+                       final int offset = globalOffset.getAndAdd(1);
+                       final Queue<FailedServer<T>> skipServers = new 
PriorityQueue<>();
+                       int index = 0;
+
+                       @Override
+                       public boolean hasNext() {
+                               return index < list.size() || 
!skipServers.isEmpty();
+                       }
+
+                       @Override
+                       public T next() {
+                               if (index < list.size()) {
+                                       T server = list.get(Math.abs(offset + 
index++) % list.size());
+                                       FailedServer failedEntry = 
failedServers.get(server);
+                                       if (failedEntry != null) {
+                                               if (System.currentTimeMillis() 
- failedEntry.failedTime > FAILED_TIME_OUT) {
+                                                       
failedServers.remove(failedEntry.server);
+                                               } else {
+                                                       
skipServers.add(failedEntry);
+                                                       return next();
+                                               }
+                                       }
+                                       return server;
+                               } else {
+                                       return skipServers.poll().server;
+                               }
+                       }
+               };
+       }
+
+       public List<T> getList() {
+               return list;
+       }
+
+       public void reportFailed(T server) {
+               this.failedServers.put(server, new FailedServer<T>(server));
+       }
+
+       private static class FailedServer<T> implements 
Comparable<FailedServer<T>>, Serializable {
+
+               protected final T server;
+
+               protected final Long failedTime;
+
+               public FailedServer(T t) {
+                       this.server = t;
+                       this.failedTime = System.currentTimeMillis();
+               }
+
+
+               @Override
+               public int compareTo(FailedServer<T> o) {
+                       return this.failedTime.compareTo(o.failedTime);
+               }
+       }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/util/LoadBalanceListTest.java
 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/util/LoadBalanceListTest.java
new file mode 100644
index 0000000..e4226e9
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/util/LoadBalanceListTest.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Set;
+import java.util.List;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.ArrayList;
+
+public class LoadBalanceListTest {
+
+       @Test
+       public void testLoadBalanceList1() {
+               List<String> serverList = Arrays.asList("server1", "server2", 
"server3");
+               LoadBalanceList<String> loadBalanceList = new 
LoadBalanceList<>(serverList);
+               Set<String> testHeadSet = new HashSet<>();
+               for (int i = 0; i < 1000; i++) {
+                       List<String> testList = new ArrayList<>();
+                       int index = 0;
+                       for (String server : loadBalanceList) {
+                               testList.add(server);
+                               if (index++ == 0) {
+                                       testHeadSet.add(server);
+                               }
+                               System.out.println(server);
+                       }
+                       if (i % serverList.size() == 0) {
+                               
Assert.assertTrue(testList.equals(Arrays.asList("server1", "server2", 
"server3")));
+                       }
+
+                       if (i % serverList.size() == 1) {
+                               
Assert.assertTrue(testList.equals(Arrays.asList("server2", "server3", 
"server1")));
+                       }
+
+                       if (i % serverList.size() == 2) {
+                               
Assert.assertTrue(testList.equals(Arrays.asList("server3", "server1", 
"server2")));
+                       }
+
+                       System.out.println("---------");
+                       Assert.assertTrue(testList.size() == serverList.size());
+               }
+               Assert.assertTrue(testHeadSet.size() == serverList.size());
+       }
+
+       @Test
+       public void testLoadBalanceList2() throws InterruptedException {
+               List<String> serverList = Arrays.asList("server1", "server2", 
"server3", "server4");
+               LoadBalanceList<String> loadBalanceList = new 
LoadBalanceList<>(serverList);
+               Set<String> failedSet = new HashSet<>();
+               failedSet.add("server1");
+               loadBalanceList.reportFailed("server1");
+               Thread.sleep(10000);
+               failedSet.add("server4");
+               loadBalanceList.reportFailed("server4");
+               Set<String> serverSet = new HashSet<>();
+
+               for (int i = 0; i < 1000; i++) {
+                       int index = 0;
+                       for (String server : loadBalanceList) {
+                               serverSet.add(server);
+                               if (++index > loadBalanceList.getList().size() 
- failedSet.size()) {
+                                       
Assert.assertTrue(failedSet.contains(server));
+                               }
+                               System.out.println(server);
+                       }
+                       System.out.println("---------");
+                       Assert.assertTrue(serverSet.size() == 
serverList.size());
+               }
+       }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to