This is an automated email from the ASF dual-hosted git repository.

JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new d0138d8  [fix] cloud compute group backend discovery (#361)
d0138d8 is described below

commit d0138d871c43a911ac5bd6373dd3377fca1bb3a0
Author: gnehil <[email protected]>
AuthorDate: Wed Jun 24 14:40:59 2026 +0800

    [fix] cloud compute group backend discovery (#361)
    
    * Fix cloud compute group backend discovery
    * Fallback to standard backends API when /rest/v2/manager/node/backends is 
unavailable on older Doris versions
---
 .../doris/spark/client/DorisFrontendClient.java    | 198 ++++++++++++++++++++-
 .../client/write/AbstractStreamLoadProcessor.java  |  18 +-
 .../scala/org/apache/doris/spark/util/URLs.scala   |   3 +
 .../spark/client/DorisFrontendClientTest.java      |  78 ++++++++
 .../write/AbstractStreamLoadProcessorTest.java     |  37 ++++
 .../org/apache/doris/spark/util/URLsTest.scala     |   8 +-
 6 files changed, 338 insertions(+), 4 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index b99902b..d9d0692 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -60,7 +60,10 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -70,6 +73,9 @@ public class DorisFrontendClient implements Serializable {
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisFrontendClient.class);
 
     private static final ObjectMapper MAPPER = JsonMapper.builder().build();
+    private static final String MANAGER_BACKENDS = 
"/rest/v2/manager/node/backends";
+    private static final String COMPUTE_GROUP_NAME = "compute_group_name";
+    private static final String CLOUD_CLUSTER_NAME = "cloud_cluster_name";
 
     private final DorisConfig config;
     private final String username;
@@ -355,6 +361,18 @@ public class DorisFrontendClient implements Serializable {
     }
 
     public List<Backend> getAliveBackends() throws Exception {
+        return getAliveBackends(null);
+    }
+
+    public List<Backend> getAliveBackends(String computeGroupName) throws 
Exception {
+        if (StringUtils.isNotBlank(computeGroupName)) {
+            try {
+                return getManagerBackends(computeGroupName);
+            } catch (Exception e) {
+                LOG.warn("Failed to get backends via 
/rest/v2/manager/node/backends for compute group '{}', "
+                        + "falling back to standard backends API. Error: {}", 
computeGroupName, e.getMessage());
+            }
+        }
         return requestFrontends((frontend, client) -> {
             String url = URLs.aliveBackend(frontend.getHost(), 
frontend.getHttpPort(), isHttpsEnabled);
             HttpGet httpGet = new HttpGet(url);
@@ -378,6 +396,184 @@ public class DorisFrontendClient implements Serializable {
         });
     }
 
+    private List<Backend> getManagerBackends(String computeGroupName) throws 
Exception {
+        return requestFrontends((frontend, client) -> {
+            String url = URLs.managerBackends(frontend.getHost(), 
frontend.getHttpPort(), isHttpsEnabled);
+            HttpGet httpGet = new HttpGet(url);
+            HttpUtils.setAuth(httpGet, username, password);
+            try (CloseableHttpResponse response = client.execute(httpGet)) {
+                if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
+                    throw new RuntimeException("request fe with url: [" + url 
+ "] failed with http code: "
+                            + response.getStatusLine().getStatusCode() + ", 
reason: "
+                            + response.getStatusLine().getReasonPhrase());
+                }
+                String entity = EntityUtils.toString(response.getEntity());
+                List<Backend> backends = parseManagerBackends(entity, 
computeGroupName);
+                Collections.shuffle(backends);
+                return backends;
+            } catch (IOException e) {
+                throw new RuntimeException("get manager backends failed", e);
+            }
+        });
+    }
+
+    static List<Backend> parseManagerBackends(String response, String 
computeGroupName) {
+        if (StringUtils.isBlank(computeGroupName)) {
+            throw managerBackendsException(computeGroupName, "compute group is 
empty");
+        }
+
+        JsonNode rootNode;
+        try {
+            rootNode = MAPPER.readTree(response);
+        } catch (IOException e) {
+            throw managerBackendsException(computeGroupName, "Parse Doris 
manager backend response to json failed. res: " + response);
+        }
+
+        JsonNode dataNode = unwrapManagerBackendData(rootNode, 
computeGroupName);
+        JsonNode columnNode = dataNode.path("columnNames");
+        if (!columnNode.isArray()) {
+            columnNode = dataNode.path("column_names");
+        }
+        JsonNode rowNode = dataNode.path("rows");
+        if (!columnNode.isArray() || !rowNode.isArray()) {
+            throw managerBackendsException(computeGroupName, "response does 
not contain columnNames/column_names and rows");
+        }
+
+        Map<String, Integer> columnIndexes = getColumnIndexes(columnNode, 
computeGroupName);
+        int hostIndex = requireColumn(columnIndexes, "Host", computeGroupName);
+        int httpPortIndex = requireColumn(columnIndexes, "HttpPort", 
computeGroupName);
+        int aliveIndex = requireColumn(columnIndexes, "Alive", 
computeGroupName);
+        int tagIndex = requireColumn(columnIndexes, "Tag", computeGroupName);
+
+        List<Backend> backends = new ArrayList<>();
+        for (JsonNode row : rowNode) {
+            if (!row.isArray()) {
+                throw managerBackendsException(computeGroupName, "backend row 
is not an array");
+            }
+            if (!Boolean.parseBoolean(getManagerBackendCell(row, aliveIndex))) 
{
+                continue;
+            }
+            String rowComputeGroupName = 
getComputeGroupNameFromTag(getManagerBackendCell(row, tagIndex));
+            if (!computeGroupName.equals(rowComputeGroupName)) {
+                continue;
+            }
+            String httpPort = getManagerBackendCell(row, httpPortIndex);
+            try {
+                backends.add(new Backend(getManagerBackendCell(row, 
hostIndex), Integer.parseInt(httpPort), -1));
+            } catch (NumberFormatException e) {
+                throw managerBackendsException(computeGroupName, "backend 
HttpPort is invalid: " + httpPort);
+            }
+        }
+
+        if (backends.isEmpty()) {
+            throw managerBackendsException(computeGroupName,
+                    "no alive backend found. If the target is a virtual 
compute group, configure its physical active compute group");
+        }
+        return backends;
+    }
+
+    private static JsonNode unwrapManagerBackendData(JsonNode rootNode, String 
computeGroupName) {
+        if (rootNode.has("code") && rootNode.has("msg")) {
+            if (!"0".equalsIgnoreCase(rootNode.path("code").asText())) {
+                throw managerBackendsException(computeGroupName,
+                        rootNode.path("msg").asText() + ": " + 
rootNode.path("data").asText());
+            }
+            return rootNode.path("data");
+        }
+        return rootNode;
+    }
+
+    private static Map<String, Integer> getColumnIndexes(JsonNode columnNode, 
String computeGroupName) {
+        Map<String, Integer> columnIndexes = new HashMap<>();
+        for (int i = 0; i < columnNode.size(); i++) {
+            String columnName = columnNode.get(i).asText();
+            if (StringUtils.isNotBlank(columnName)) {
+                columnIndexes.put(columnName.toLowerCase(), i);
+            }
+        }
+        if (columnIndexes.isEmpty()) {
+            throw managerBackendsException(computeGroupName, "backend columns 
are empty");
+        }
+        return columnIndexes;
+    }
+
+    private static int requireColumn(Map<String, Integer> columnIndexes, 
String columnName, String computeGroupName) {
+        Integer index = columnIndexes.get(columnName.toLowerCase());
+        if (index == null) {
+            throw managerBackendsException(computeGroupName, "backend response 
missing required column " + columnName);
+        }
+        return index;
+    }
+
+    private static String getManagerBackendCell(JsonNode row, int index) {
+        JsonNode cell = row.get(index);
+        if (cell == null || cell.isNull()) {
+            return "";
+        }
+        return cell.asText();
+    }
+
+    static String getComputeGroupNameFromTag(String tag) {
+        Map<String, String> tagMap = parseBackendTag(tag);
+        String computeGroupName = tagMap.get(COMPUTE_GROUP_NAME);
+        if (StringUtils.isNotBlank(computeGroupName)) {
+            return computeGroupName;
+        }
+        return tagMap.get(CLOUD_CLUSTER_NAME);
+    }
+
+    private static Map<String, String> parseBackendTag(String tag) {
+        Map<String, String> tagMap = new HashMap<>();
+        if (StringUtils.isBlank(tag)) {
+            return tagMap;
+        }
+
+        try {
+            JsonNode tagNode = MAPPER.readTree(tag);
+            if (tagNode.isObject()) {
+                Iterator<Map.Entry<String, JsonNode>> fields = 
tagNode.fields();
+                while (fields.hasNext()) {
+                    Map.Entry<String, JsonNode> entry = fields.next();
+                    tagMap.put(entry.getKey(), entry.getValue().asText());
+                }
+                return tagMap;
+            }
+        } catch (IOException e) {
+            // Fall through to parse Doris PrintableMap style tag strings.
+        }
+
+        String tagContent = tag.trim();
+        if (tagContent.startsWith("{") && tagContent.endsWith("}")) {
+            tagContent = tagContent.substring(1, tagContent.length() - 1);
+        }
+        for (String entry : tagContent.split(",")) {
+            String[] keyValue = entry.split(":", 2);
+            if (keyValue.length != 2) {
+                continue;
+            }
+            tagMap.put(stripQuote(keyValue[0]), stripQuote(keyValue[1]));
+        }
+        return tagMap;
+    }
+
+    private static String stripQuote(String value) {
+        String result = value.trim();
+        if (result.length() >= 2) {
+            char first = result.charAt(0);
+            char last = result.charAt(result.length() - 1);
+            if ((first == '"' && last == '"') || (first == '\'' && last == 
'\'')) {
+                return result.substring(1, result.length() - 1);
+            }
+        }
+        return result;
+    }
+
+    private static RuntimeException managerBackendsException(String 
computeGroupName, String reason) {
+        return new RuntimeException(String.format(
+                "Failed to get backends for compute group '%s' from %s: %s. 
Required privileges: information_schema SELECT on Doris 3.x/4.x, or ADMIN on 
Doris 2.1.",
+                computeGroupName, MANAGER_BACKENDS, reason));
+    }
+
     public void truncateTable(String database, String table) throws Exception {
         queryFrontends(conn -> {
             String sql = "TRUNCATE TABLE " + database + "." + table;
@@ -400,4 +596,4 @@ public class DorisFrontendClient implements Serializable {
         }
     }
 
-}
\ No newline at end of file
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 1ee3711..e826de6 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -34,6 +34,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.json.JsonMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpStatus;
@@ -71,6 +72,8 @@ public abstract class AbstractStreamLoadProcessor<R> extends 
DorisWriter<R> impl
     protected static final JsonMapper MAPPER =
             
JsonMapper.builder().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
 false).build();
     private static final int arrowBufferSize = 1000;
+    private static final String CLOUD_CLUSTER = "cloud_cluster";
+    private static final String COMPUTE_GROUP = "compute_group";
     protected final Logger logger = 
LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", ""));
     protected final DorisConfig config;
     private final DorisFrontendClient frontend;
@@ -105,9 +108,9 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
         this.table = dbTableArr[1].replaceAll("`", "").trim();
         this.frontend = new DorisFrontendClient(config);
         this.autoRedirect = 
config.getValue(DorisOptions.DORIS_SINK_AUTO_REDIRECT);
+        this.properties = config.getSinkProperties();
         this.backendHttpClient = autoRedirect ? null : new 
DorisBackendHttpClient(getBackends());
         this.isHttpsEnabled = config.getValue(DorisOptions.DORIS_ENABLE_HTTPS);
-        this.properties = config.getSinkProperties();
         // init stream load props
         this.isTwoPhaseCommitEnabled = 
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC);
         this.format = DataFormat.valueOf(properties.getOrDefault("format", 
"csv").toUpperCase());
@@ -456,8 +459,19 @@ public abstract class AbstractStreamLoadProcessor<R> 
extends DorisWriter<R> impl
                 return new Backend(beNodeArr[0], 
Integer.valueOf(beNodeArr[1]), -1);
             }).collect(Collectors.toList());
         } else {
-            return frontend.getAliveBackends();
+            return 
frontend.getAliveBackends(getLoadTargetComputeGroup(properties));
+        }
+    }
+
+    static String getLoadTargetComputeGroup(Map<String, String> properties) {
+        if (properties == null) {
+            return null;
+        }
+        String computeGroup = properties.get(COMPUTE_GROUP);
+        if (StringUtils.isNotBlank(computeGroup)) {
+            return computeGroup;
         }
+        return properties.get(CLOUD_CLUSTER);
     }
 
     protected abstract R copy(R row);
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
index c88bb49..b4e868a 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
@@ -56,6 +56,9 @@ object URLs {
   def getFrontEndNodes(host: String, port: Int, enableHttps: Boolean = false) =
     s"${schema(enableHttps)}://${assemblePath(host, 
port)}/rest/v2/manager/node/frontends"
 
+  def managerBackends(host: String, port: Int, enableHttps: Boolean = false) =
+    s"${schema(enableHttps)}://${assemblePath(host, 
port)}/rest/v2/manager/node/backends"
+
   def copyIntoUpload(host: String, port: Int, enableHttps: Boolean = false) =
     s"${schema(enableHttps)}://${assemblePath(host, port)}/copy/upload"
 
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/DorisFrontendClientTest.java
 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/DorisFrontendClientTest.java
new file mode 100644
index 0000000..aa123af
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/DorisFrontendClientTest.java
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.client;
+
+import org.apache.doris.spark.client.entity.Backend;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class DorisFrontendClientTest {
+
+    @Test
+    public void parseManagerBackendsFilterComputeGroup() {
+        String response = 
"{\"columnNames\":[\"BackendId\",\"Host\",\"HttpPort\",\"Alive\",\"Tag\"],"
+                + "\"rows\":["
+                + 
"[\"1\",\"192.168.1.1\",\"8042\",\"true\",\"{\\\"compute_group_name\\\":\\\"cluster_a\\\"}\"],"
+                + 
"[\"2\",\"192.168.1.2\",\"8042\",\"true\",\"{\\\"compute_group_name\\\":\\\"cluster_b\\\"}\"],"
+                + 
"[\"3\",\"192.168.1.3\",\"8042\",\"false\",\"{\\\"compute_group_name\\\":\\\"cluster_a\\\"}\"]"
+                + "]}";
+
+        List<Backend> backends = 
DorisFrontendClient.parseManagerBackends(response, "cluster_a");
+
+        Assert.assertEquals(1, backends.size());
+        Assert.assertEquals(new Backend("192.168.1.1", 8042, -1), 
backends.get(0));
+    }
+
+    @Test
+    public void parseManagerBackendsColumnNamesAndCloudCluster() {
+        String response = "{\"code\":0,\"msg\":\"success\",\"data\":{"
+                + 
"\"column_names\":[\"BackendId\",\"Host\",\"HttpPort\",\"Alive\",\"Tag\"],"
+                + "\"rows\":["
+                + 
"[\"1\",\"192.168.1.1\",\"8042\",\"true\",\"{\\\"cloud_cluster_name\\\":\\\"cluster_a\\\"}\"]"
+                + "]}}";
+
+        List<Backend> backends = 
DorisFrontendClient.parseManagerBackends(response, "cluster_a");
+
+        Assert.assertEquals(1, backends.size());
+        Assert.assertEquals(new Backend("192.168.1.1", 8042, -1), 
backends.get(0));
+    }
+
+    @Test
+    public void parseManagerBackendsPrintableTag() {
+        Assert.assertEquals("cluster_a",
+                
DorisFrontendClient.getComputeGroupNameFromTag("{compute_group_name:cluster_a, 
location:default}"));
+    }
+
+    @Test
+    public void parseManagerBackendsNoTargetBackend() {
+        String response = 
"{\"columnNames\":[\"BackendId\",\"Host\",\"HttpPort\",\"Alive\",\"Tag\"],"
+                + "\"rows\":["
+                + 
"[\"1\",\"192.168.1.1\",\"8042\",\"true\",\"{\\\"compute_group_name\\\":\\\"cluster_a\\\"}\"],"
+                + 
"[\"2\",\"192.168.1.2\",\"8042\",\"false\",\"{\\\"compute_group_name\\\":\\\"cluster_b\\\"}\"]"
+                + "]}";
+
+        try {
+            DorisFrontendClient.parseManagerBackends(response, "cluster_b");
+            Assert.fail("Expected no alive backend exception");
+        } catch (RuntimeException e) {
+            Assert.assertTrue(e.getMessage().contains("no alive backend 
found"));
+        }
+    }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessorTest.java
 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessorTest.java
new file mode 100644
index 0000000..b63218a
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessorTest.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.client.write;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AbstractStreamLoadProcessorTest {
+
+    @Test
+    public void getLoadTargetComputeGroup() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("cloud_cluster", "cluster_a");
+        Assert.assertEquals("cluster_a", 
AbstractStreamLoadProcessor.getLoadTargetComputeGroup(properties));
+
+        properties.put("compute_group", "cluster_b");
+        Assert.assertEquals("cluster_b", 
AbstractStreamLoadProcessor.getLoadTargetComputeGroup(properties));
+    }
+}
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/URLsTest.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/URLsTest.scala
index 3784885..ff977c0 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/URLsTest.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/URLsTest.scala
@@ -18,7 +18,7 @@
 package org.apache.doris.spark.util
 
 import org.junit.Assert
-import org.junit.jupiter.api.Test
+import org.junit.Test
 
 class URLsTest {
 
@@ -58,4 +58,10 @@ class URLsTest {
     Assert.assertEquals(URLs.getFrontEndNodes("127.0.0.1", 8030, enableHttps = 
true), "https://127.0.0.1:8030/rest/v2/manager/node/frontends";)
   }
 
+  @Test
+  def managerBackendsTest(): Unit = {
+    Assert.assertEquals(URLs.managerBackends("127.0.0.1", 8030), 
"http://127.0.0.1:8030/rest/v2/manager/node/backends";)
+    Assert.assertEquals(URLs.managerBackends("127.0.0.1", 8030, enableHttps = 
true), "https://127.0.0.1:8030/rest/v2/manager/node/backends";)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to