Copilot commented on code in PR #361:
URL:
https://github.com/apache/doris-spark-connector/pull/361#discussion_r3346626840
##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java:
##########
@@ -355,6 +361,13 @@ public String[] getTableAllColumns(String db, String
table) throws Exception {
}
public List<Backend> getAliveBackends() throws Exception {
+ return getAliveBackends(null);
+ }
+
+ public List<Backend> getAliveBackends(String computeGroupName) throws
Exception {
+ if (StringUtils.isNotBlank(computeGroupName)) {
+ return getManagerBackends(computeGroupName);
+ }
Review Comment:
`computeGroupName` is used for an exact string match in backend filtering.
If callers pass a value with surrounding whitespace, this will trigger
manager-backend discovery but then fail to match tags (leading to a misleading
"no alive backend" error). Trim the parameter once and use the trimmed value
throughout.
##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java:
##########
@@ -456,8 +459,19 @@ private List<Backend> getBackends() throws Exception {
return new Backend(beNodeArr[0],
Integer.valueOf(beNodeArr[1]), -1);
}).collect(Collectors.toList());
} else {
- return frontend.getAliveBackends();
+ return
frontend.getAliveBackends(getLoadTargetComputeGroup(properties));
+ }
+ }
+
+ static String getLoadTargetComputeGroup(Map<String, String> properties) {
+ if (properties == null) {
+ return null;
+ }
+ String computeGroup = properties.get(COMPUTE_GROUP);
+ if (StringUtils.isNotBlank(computeGroup)) {
+ return computeGroup;
}
+ return properties.get(CLOUD_CLUSTER);
Review Comment:
`compute_group` / `cloud_cluster` values come from
`DorisConfig.getSinkProperties()` without trimming. Returning the raw value can
include leading/trailing whitespace, which will later cause compute-group
matching to fail (and can route stream load to the wrong backend set). Consider
trimming and treating all-whitespace values as unset.
##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java:
##########
@@ -378,6 +391,184 @@ public List<Backend> getAliveBackends() throws Exception {
});
}
+ private List<Backend> getManagerBackends(String computeGroupName) throws
Exception {
+ return requestFrontends((frontend, client) -> {
+ String url = URLs.managerBackends(frontend.getHost(),
frontend.getHttpPort(), isHttpsEnabled);
+ HttpGet httpGet = new HttpGet(url);
+ HttpUtils.setAuth(httpGet, username, password);
+ try (CloseableHttpResponse response = client.execute(httpGet)) {
+ if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
+ throw new RuntimeException("request fe with url: [" + url
+ "] failed with http code: "
+ + response.getStatusLine().getStatusCode() + ",
reason: "
+ + response.getStatusLine().getReasonPhrase());
+ }
+ String entity = EntityUtils.toString(response.getEntity());
+ List<Backend> backends = parseManagerBackends(entity,
computeGroupName);
+ Collections.shuffle(backends);
+ return backends;
+ } catch (IOException e) {
+ throw new RuntimeException("get manager backends failed", e);
+ }
+ });
+ }
+
+ static List<Backend> parseManagerBackends(String response, String
computeGroupName) {
+ if (StringUtils.isBlank(computeGroupName)) {
+ throw managerBackendsException(computeGroupName, "compute group is
empty");
+ }
+
+ JsonNode rootNode;
+ try {
+ rootNode = MAPPER.readTree(response);
+ } catch (IOException e) {
+ throw managerBackendsException(computeGroupName, "Parse Doris
manager backend response to json failed. res: " + response);
+ }
Review Comment:
When JSON parsing fails, the thrown exception drops the original
`IOException` cause, which makes debugging malformed/permission-denied
responses harder (stack trace is lost). Wrap the generated message and keep the
original cause.
##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java:
##########
@@ -378,6 +391,184 @@ public List<Backend> getAliveBackends() throws Exception {
});
}
+ private List<Backend> getManagerBackends(String computeGroupName) throws
Exception {
+ return requestFrontends((frontend, client) -> {
+ String url = URLs.managerBackends(frontend.getHost(),
frontend.getHttpPort(), isHttpsEnabled);
+ HttpGet httpGet = new HttpGet(url);
+ HttpUtils.setAuth(httpGet, username, password);
+ try (CloseableHttpResponse response = client.execute(httpGet)) {
+ if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
+ throw new RuntimeException("request fe with url: [" + url
+ "] failed with http code: "
+ + response.getStatusLine().getStatusCode() + ",
reason: "
+ + response.getStatusLine().getReasonPhrase());
+ }
+ String entity = EntityUtils.toString(response.getEntity());
+ List<Backend> backends = parseManagerBackends(entity,
computeGroupName);
+ Collections.shuffle(backends);
+ return backends;
+ } catch (IOException e) {
+ throw new RuntimeException("get manager backends failed", e);
+ }
+ });
+ }
+
+ static List<Backend> parseManagerBackends(String response, String
computeGroupName) {
+ if (StringUtils.isBlank(computeGroupName)) {
+ throw managerBackendsException(computeGroupName, "compute group is
empty");
+ }
+
+ JsonNode rootNode;
+ try {
+ rootNode = MAPPER.readTree(response);
+ } catch (IOException e) {
+ throw managerBackendsException(computeGroupName, "Parse Doris
manager backend response to json failed. res: " + response);
+ }
+
+ JsonNode dataNode = unwrapManagerBackendData(rootNode,
computeGroupName);
+ JsonNode columnNode = dataNode.path("columnNames");
+ if (!columnNode.isArray()) {
+ columnNode = dataNode.path("column_names");
+ }
+ JsonNode rowNode = dataNode.path("rows");
+ if (!columnNode.isArray() || !rowNode.isArray()) {
+ throw managerBackendsException(computeGroupName, "response does
not contain columnNames/column_names and rows");
+ }
+
+ Map<String, Integer> columnIndexes = getColumnIndexes(columnNode,
computeGroupName);
+ int hostIndex = requireColumn(columnIndexes, "Host", computeGroupName);
+ int httpPortIndex = requireColumn(columnIndexes, "HttpPort",
computeGroupName);
+ int aliveIndex = requireColumn(columnIndexes, "Alive",
computeGroupName);
+ int tagIndex = requireColumn(columnIndexes, "Tag", computeGroupName);
+
+ List<Backend> backends = new ArrayList<>();
+ for (JsonNode row : rowNode) {
+ if (!row.isArray()) {
+ throw managerBackendsException(computeGroupName, "backend row
is not an array");
+ }
+ if (!Boolean.parseBoolean(getManagerBackendCell(row, aliveIndex)))
{
+ continue;
+ }
+ String rowComputeGroupName =
getComputeGroupNameFromTag(getManagerBackendCell(row, tagIndex));
+ if (!computeGroupName.equals(rowComputeGroupName)) {
+ continue;
+ }
+ String httpPort = getManagerBackendCell(row, httpPortIndex);
+ try {
+ backends.add(new Backend(getManagerBackendCell(row,
hostIndex), Integer.parseInt(httpPort), -1));
+ } catch (NumberFormatException e) {
+ throw managerBackendsException(computeGroupName, "backend
HttpPort is invalid: " + httpPort);
+ }
+ }
+
+ if (backends.isEmpty()) {
+ throw managerBackendsException(computeGroupName,
+ "no alive backend found. If the target is a virtual
compute group, configure its physical active compute group");
+ }
+ return backends;
+ }
+
+ private static JsonNode unwrapManagerBackendData(JsonNode rootNode, String
computeGroupName) {
+ if (rootNode.has("code") && rootNode.has("msg")) {
+ if (!"0".equalsIgnoreCase(rootNode.path("code").asText())) {
+ throw managerBackendsException(computeGroupName,
+ rootNode.path("msg").asText() + ": " +
rootNode.path("data").asText());
Review Comment:
For non-success manager responses, `rootNode.path("data").asText()` returns
an empty string when `data` is an object/array, which can hide useful error
details in the exception message. Using `toString()` preserves structured
content.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]