This is an automated email from the ASF dual-hosted git repository.
JNSimba pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new d0138d8 [fix] cloud compute group backend discovery (#361)
d0138d8 is described below
commit d0138d871c43a911ac5bd6373dd3377fca1bb3a0
Author: gnehil <[email protected]>
AuthorDate: Wed Jun 24 14:40:59 2026 +0800
[fix] cloud compute group backend discovery (#361)
* Fix cloud compute group backend discovery
* Fallback to standard backends API when /rest/v2/manager/node/backends is
unavailable on older Doris versions
---
.../doris/spark/client/DorisFrontendClient.java | 198 ++++++++++++++++++++-
.../client/write/AbstractStreamLoadProcessor.java | 18 +-
.../scala/org/apache/doris/spark/util/URLs.scala | 3 +
.../spark/client/DorisFrontendClientTest.java | 78 ++++++++
.../write/AbstractStreamLoadProcessorTest.java | 37 ++++
.../org/apache/doris/spark/util/URLsTest.scala | 8 +-
6 files changed, 338 insertions(+), 4 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index b99902b..d9d0692 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -60,7 +60,10 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -70,6 +73,9 @@ public class DorisFrontendClient implements Serializable {
private static final Logger LOG =
LoggerFactory.getLogger(DorisFrontendClient.class);
private static final ObjectMapper MAPPER = JsonMapper.builder().build();
+ private static final String MANAGER_BACKENDS =
"/rest/v2/manager/node/backends";
+ private static final String COMPUTE_GROUP_NAME = "compute_group_name";
+ private static final String CLOUD_CLUSTER_NAME = "cloud_cluster_name";
private final DorisConfig config;
private final String username;
@@ -355,6 +361,18 @@ public class DorisFrontendClient implements Serializable {
}
public List<Backend> getAliveBackends() throws Exception {
+ return getAliveBackends(null);
+ }
+
+ public List<Backend> getAliveBackends(String computeGroupName) throws
Exception {
+ if (StringUtils.isNotBlank(computeGroupName)) {
+ try {
+ return getManagerBackends(computeGroupName);
+ } catch (Exception e) {
+ LOG.warn("Failed to get backends via
/rest/v2/manager/node/backends for compute group '{}', "
+ + "falling back to standard backends API. Error: {}",
computeGroupName, e.getMessage());
+ }
+ }
return requestFrontends((frontend, client) -> {
String url = URLs.aliveBackend(frontend.getHost(),
frontend.getHttpPort(), isHttpsEnabled);
HttpGet httpGet = new HttpGet(url);
@@ -378,6 +396,184 @@ public class DorisFrontendClient implements Serializable {
});
}
+ private List<Backend> getManagerBackends(String computeGroupName) throws
Exception {
+ return requestFrontends((frontend, client) -> {
+ String url = URLs.managerBackends(frontend.getHost(),
frontend.getHttpPort(), isHttpsEnabled);
+ HttpGet httpGet = new HttpGet(url);
+ HttpUtils.setAuth(httpGet, username, password);
+ try (CloseableHttpResponse response = client.execute(httpGet)) {
+ if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
+ throw new RuntimeException("request fe with url: [" + url
+ "] failed with http code: "
+ + response.getStatusLine().getStatusCode() + ",
reason: "
+ + response.getStatusLine().getReasonPhrase());
+ }
+ String entity = EntityUtils.toString(response.getEntity());
+ List<Backend> backends = parseManagerBackends(entity,
computeGroupName);
+ Collections.shuffle(backends);
+ return backends;
+ } catch (IOException e) {
+ throw new RuntimeException("get manager backends failed", e);
+ }
+ });
+ }
+
+ static List<Backend> parseManagerBackends(String response, String
computeGroupName) {
+ if (StringUtils.isBlank(computeGroupName)) {
+ throw managerBackendsException(computeGroupName, "compute group is
empty");
+ }
+
+ JsonNode rootNode;
+ try {
+ rootNode = MAPPER.readTree(response);
+ } catch (IOException e) {
+ throw managerBackendsException(computeGroupName, "Parse Doris
manager backend response to json failed. res: " + response);
+ }
+
+ JsonNode dataNode = unwrapManagerBackendData(rootNode,
computeGroupName);
+ JsonNode columnNode = dataNode.path("columnNames");
+ if (!columnNode.isArray()) {
+ columnNode = dataNode.path("column_names");
+ }
+ JsonNode rowNode = dataNode.path("rows");
+ if (!columnNode.isArray() || !rowNode.isArray()) {
+ throw managerBackendsException(computeGroupName, "response does
not contain columnNames/column_names and rows");
+ }
+
+ Map<String, Integer> columnIndexes = getColumnIndexes(columnNode,
computeGroupName);
+ int hostIndex = requireColumn(columnIndexes, "Host", computeGroupName);
+ int httpPortIndex = requireColumn(columnIndexes, "HttpPort",
computeGroupName);
+ int aliveIndex = requireColumn(columnIndexes, "Alive",
computeGroupName);
+ int tagIndex = requireColumn(columnIndexes, "Tag", computeGroupName);
+
+ List<Backend> backends = new ArrayList<>();
+ for (JsonNode row : rowNode) {
+ if (!row.isArray()) {
+ throw managerBackendsException(computeGroupName, "backend row
is not an array");
+ }
+ if (!Boolean.parseBoolean(getManagerBackendCell(row, aliveIndex)))
{
+ continue;
+ }
+ String rowComputeGroupName =
getComputeGroupNameFromTag(getManagerBackendCell(row, tagIndex));
+ if (!computeGroupName.equals(rowComputeGroupName)) {
+ continue;
+ }
+ String httpPort = getManagerBackendCell(row, httpPortIndex);
+ try {
+ backends.add(new Backend(getManagerBackendCell(row,
hostIndex), Integer.parseInt(httpPort), -1));
+ } catch (NumberFormatException e) {
+ throw managerBackendsException(computeGroupName, "backend
HttpPort is invalid: " + httpPort);
+ }
+ }
+
+ if (backends.isEmpty()) {
+ throw managerBackendsException(computeGroupName,
+ "no alive backend found. If the target is a virtual
compute group, configure its physical active compute group");
+ }
+ return backends;
+ }
+
+ private static JsonNode unwrapManagerBackendData(JsonNode rootNode, String
computeGroupName) {
+ if (rootNode.has("code") && rootNode.has("msg")) {
+ if (!"0".equalsIgnoreCase(rootNode.path("code").asText())) {
+ throw managerBackendsException(computeGroupName,
+ rootNode.path("msg").asText() + ": " +
rootNode.path("data").asText());
+ }
+ return rootNode.path("data");
+ }
+ return rootNode;
+ }
+
+ private static Map<String, Integer> getColumnIndexes(JsonNode columnNode,
String computeGroupName) {
+ Map<String, Integer> columnIndexes = new HashMap<>();
+ for (int i = 0; i < columnNode.size(); i++) {
+ String columnName = columnNode.get(i).asText();
+ if (StringUtils.isNotBlank(columnName)) {
+ columnIndexes.put(columnName.toLowerCase(), i);
+ }
+ }
+ if (columnIndexes.isEmpty()) {
+ throw managerBackendsException(computeGroupName, "backend columns
are empty");
+ }
+ return columnIndexes;
+ }
+
+ private static int requireColumn(Map<String, Integer> columnIndexes,
String columnName, String computeGroupName) {
+ Integer index = columnIndexes.get(columnName.toLowerCase());
+ if (index == null) {
+ throw managerBackendsException(computeGroupName, "backend response
missing required column " + columnName);
+ }
+ return index;
+ }
+
+ private static String getManagerBackendCell(JsonNode row, int index) {
+ JsonNode cell = row.get(index);
+ if (cell == null || cell.isNull()) {
+ return "";
+ }
+ return cell.asText();
+ }
+
+ static String getComputeGroupNameFromTag(String tag) {
+ Map<String, String> tagMap = parseBackendTag(tag);
+ String computeGroupName = tagMap.get(COMPUTE_GROUP_NAME);
+ if (StringUtils.isNotBlank(computeGroupName)) {
+ return computeGroupName;
+ }
+ return tagMap.get(CLOUD_CLUSTER_NAME);
+ }
+
+ private static Map<String, String> parseBackendTag(String tag) {
+ Map<String, String> tagMap = new HashMap<>();
+ if (StringUtils.isBlank(tag)) {
+ return tagMap;
+ }
+
+ try {
+ JsonNode tagNode = MAPPER.readTree(tag);
+ if (tagNode.isObject()) {
+ Iterator<Map.Entry<String, JsonNode>> fields =
tagNode.fields();
+ while (fields.hasNext()) {
+ Map.Entry<String, JsonNode> entry = fields.next();
+ tagMap.put(entry.getKey(), entry.getValue().asText());
+ }
+ return tagMap;
+ }
+ } catch (IOException e) {
+ // Fall through to parse Doris PrintableMap style tag strings.
+ }
+
+ String tagContent = tag.trim();
+ if (tagContent.startsWith("{") && tagContent.endsWith("}")) {
+ tagContent = tagContent.substring(1, tagContent.length() - 1);
+ }
+ for (String entry : tagContent.split(",")) {
+ String[] keyValue = entry.split(":", 2);
+ if (keyValue.length != 2) {
+ continue;
+ }
+ tagMap.put(stripQuote(keyValue[0]), stripQuote(keyValue[1]));
+ }
+ return tagMap;
+ }
+
+ private static String stripQuote(String value) {
+ String result = value.trim();
+ if (result.length() >= 2) {
+ char first = result.charAt(0);
+ char last = result.charAt(result.length() - 1);
+ if ((first == '"' && last == '"') || (first == '\'' && last ==
'\'')) {
+ return result.substring(1, result.length() - 1);
+ }
+ }
+ return result;
+ }
+
+ private static RuntimeException managerBackendsException(String
computeGroupName, String reason) {
+ return new RuntimeException(String.format(
+ "Failed to get backends for compute group '%s' from %s: %s.
Required privileges: information_schema SELECT on Doris 3.x/4.x, or ADMIN on
Doris 2.1.",
+ computeGroupName, MANAGER_BACKENDS, reason));
+ }
+
public void truncateTable(String database, String table) throws Exception {
queryFrontends(conn -> {
String sql = "TRUNCATE TABLE " + database + "." + table;
@@ -400,4 +596,4 @@ public class DorisFrontendClient implements Serializable {
}
}
-}
\ No newline at end of file
+}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 1ee3711..e826de6 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -34,6 +34,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
@@ -71,6 +72,8 @@ public abstract class AbstractStreamLoadProcessor<R> extends
DorisWriter<R> impl
protected static final JsonMapper MAPPER =
JsonMapper.builder().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false).build();
private static final int arrowBufferSize = 1000;
+ private static final String CLOUD_CLUSTER = "cloud_cluster";
+ private static final String COMPUTE_GROUP = "compute_group";
protected final Logger logger =
LoggerFactory.getLogger(this.getClass().getName().replaceAll("\\$", ""));
protected final DorisConfig config;
private final DorisFrontendClient frontend;
@@ -105,9 +108,9 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
this.table = dbTableArr[1].replaceAll("`", "").trim();
this.frontend = new DorisFrontendClient(config);
this.autoRedirect =
config.getValue(DorisOptions.DORIS_SINK_AUTO_REDIRECT);
+ this.properties = config.getSinkProperties();
this.backendHttpClient = autoRedirect ? null : new
DorisBackendHttpClient(getBackends());
this.isHttpsEnabled = config.getValue(DorisOptions.DORIS_ENABLE_HTTPS);
- this.properties = config.getSinkProperties();
// init stream load props
this.isTwoPhaseCommitEnabled =
config.getValue(DorisOptions.DORIS_SINK_ENABLE_2PC);
this.format = DataFormat.valueOf(properties.getOrDefault("format",
"csv").toUpperCase());
@@ -456,8 +459,19 @@ public abstract class AbstractStreamLoadProcessor<R>
extends DorisWriter<R> impl
return new Backend(beNodeArr[0],
Integer.valueOf(beNodeArr[1]), -1);
}).collect(Collectors.toList());
} else {
- return frontend.getAliveBackends();
+ return
frontend.getAliveBackends(getLoadTargetComputeGroup(properties));
+ }
+ }
+
+ static String getLoadTargetComputeGroup(Map<String, String> properties) {
+ if (properties == null) {
+ return null;
+ }
+ String computeGroup = properties.get(COMPUTE_GROUP);
+ if (StringUtils.isNotBlank(computeGroup)) {
+ return computeGroup;
}
+ return properties.get(CLOUD_CLUSTER);
}
protected abstract R copy(R row);
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
index c88bb49..b4e868a 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/URLs.scala
@@ -56,6 +56,9 @@ object URLs {
def getFrontEndNodes(host: String, port: Int, enableHttps: Boolean = false) =
s"${schema(enableHttps)}://${assemblePath(host,
port)}/rest/v2/manager/node/frontends"
+ def managerBackends(host: String, port: Int, enableHttps: Boolean = false) =
+ s"${schema(enableHttps)}://${assemblePath(host,
port)}/rest/v2/manager/node/backends"
+
def copyIntoUpload(host: String, port: Int, enableHttps: Boolean = false) =
s"${schema(enableHttps)}://${assemblePath(host, port)}/copy/upload"
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/DorisFrontendClientTest.java
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/DorisFrontendClientTest.java
new file mode 100644
index 0000000..aa123af
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/DorisFrontendClientTest.java
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.client;
+
+import org.apache.doris.spark.client.entity.Backend;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class DorisFrontendClientTest {
+
+ @Test
+ public void parseManagerBackendsFilterComputeGroup() {
+ String response =
"{\"columnNames\":[\"BackendId\",\"Host\",\"HttpPort\",\"Alive\",\"Tag\"],"
+ + "\"rows\":["
+ +
"[\"1\",\"192.168.1.1\",\"8042\",\"true\",\"{\\\"compute_group_name\\\":\\\"cluster_a\\\"}\"],"
+ +
"[\"2\",\"192.168.1.2\",\"8042\",\"true\",\"{\\\"compute_group_name\\\":\\\"cluster_b\\\"}\"],"
+ +
"[\"3\",\"192.168.1.3\",\"8042\",\"false\",\"{\\\"compute_group_name\\\":\\\"cluster_a\\\"}\"]"
+ + "]}";
+
+ List<Backend> backends =
DorisFrontendClient.parseManagerBackends(response, "cluster_a");
+
+ Assert.assertEquals(1, backends.size());
+ Assert.assertEquals(new Backend("192.168.1.1", 8042, -1),
backends.get(0));
+ }
+
+ @Test
+ public void parseManagerBackendsColumnNamesAndCloudCluster() {
+ String response = "{\"code\":0,\"msg\":\"success\",\"data\":{"
+ +
"\"column_names\":[\"BackendId\",\"Host\",\"HttpPort\",\"Alive\",\"Tag\"],"
+ + "\"rows\":["
+ +
"[\"1\",\"192.168.1.1\",\"8042\",\"true\",\"{\\\"cloud_cluster_name\\\":\\\"cluster_a\\\"}\"]"
+ + "]}}";
+
+ List<Backend> backends =
DorisFrontendClient.parseManagerBackends(response, "cluster_a");
+
+ Assert.assertEquals(1, backends.size());
+ Assert.assertEquals(new Backend("192.168.1.1", 8042, -1),
backends.get(0));
+ }
+
+ @Test
+ public void parseManagerBackendsPrintableTag() {
+ Assert.assertEquals("cluster_a",
+
DorisFrontendClient.getComputeGroupNameFromTag("{compute_group_name:cluster_a,
location:default}"));
+ }
+
+ @Test
+ public void parseManagerBackendsNoTargetBackend() {
+ String response =
"{\"columnNames\":[\"BackendId\",\"Host\",\"HttpPort\",\"Alive\",\"Tag\"],"
+ + "\"rows\":["
+ +
"[\"1\",\"192.168.1.1\",\"8042\",\"true\",\"{\\\"compute_group_name\\\":\\\"cluster_a\\\"}\"],"
+ +
"[\"2\",\"192.168.1.2\",\"8042\",\"false\",\"{\\\"compute_group_name\\\":\\\"cluster_b\\\"}\"]"
+ + "]}";
+
+ try {
+ DorisFrontendClient.parseManagerBackends(response, "cluster_b");
+ Assert.fail("Expected no alive backend exception");
+ } catch (RuntimeException e) {
+ Assert.assertTrue(e.getMessage().contains("no alive backend
found"));
+ }
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessorTest.java
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessorTest.java
new file mode 100644
index 0000000..b63218a
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessorTest.java
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.client.write;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AbstractStreamLoadProcessorTest {
+
+ @Test
+ public void getLoadTargetComputeGroup() {
+ Map<String, String> properties = new HashMap<>();
+ properties.put("cloud_cluster", "cluster_a");
+ Assert.assertEquals("cluster_a",
AbstractStreamLoadProcessor.getLoadTargetComputeGroup(properties));
+
+ properties.put("compute_group", "cluster_b");
+ Assert.assertEquals("cluster_b",
AbstractStreamLoadProcessor.getLoadTargetComputeGroup(properties));
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/URLsTest.scala
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/URLsTest.scala
index 3784885..ff977c0 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/URLsTest.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/URLsTest.scala
@@ -18,7 +18,7 @@
package org.apache.doris.spark.util
import org.junit.Assert
-import org.junit.jupiter.api.Test
+import org.junit.Test
class URLsTest {
@@ -58,4 +58,10 @@ class URLsTest {
Assert.assertEquals(URLs.getFrontEndNodes("127.0.0.1", 8030, enableHttps =
true), "https://127.0.0.1:8030/rest/v2/manager/node/frontends")
}
+ @Test
+ def managerBackendsTest(): Unit = {
+ Assert.assertEquals(URLs.managerBackends("127.0.0.1", 8030),
"http://127.0.0.1:8030/rest/v2/manager/node/backends")
+ Assert.assertEquals(URLs.managerBackends("127.0.0.1", 8030, enableHttps =
true), "https://127.0.0.1:8030/rest/v2/manager/node/backends")
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]