Copilot commented on code in PR #361:
URL: 
https://github.com/apache/doris-spark-connector/pull/361#discussion_r3346626840


##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java:
##########
@@ -355,6 +361,13 @@ public String[] getTableAllColumns(String db, String 
table) throws Exception {
     }
 
     public List<Backend> getAliveBackends() throws Exception {
+        return getAliveBackends(null);
+    }
+
+    public List<Backend> getAliveBackends(String computeGroupName) throws 
Exception {
+        if (StringUtils.isNotBlank(computeGroupName)) {
+            return getManagerBackends(computeGroupName);
+        }

Review Comment:
   `computeGroupName` is used for an exact string match in backend filtering. 
If callers pass a value with surrounding whitespace, this will trigger 
manager-backend discovery but then fail to match tags (leading to a misleading 
"no alive backend" error). Trim the parameter once and use the trimmed value 
throughout.



##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java:
##########
@@ -456,8 +459,19 @@ private List<Backend> getBackends() throws Exception {
                 return new Backend(beNodeArr[0], 
Integer.valueOf(beNodeArr[1]), -1);
             }).collect(Collectors.toList());
         } else {
-            return frontend.getAliveBackends();
+            return 
frontend.getAliveBackends(getLoadTargetComputeGroup(properties));
+        }
+    }
+
+    static String getLoadTargetComputeGroup(Map<String, String> properties) {
+        if (properties == null) {
+            return null;
+        }
+        String computeGroup = properties.get(COMPUTE_GROUP);
+        if (StringUtils.isNotBlank(computeGroup)) {
+            return computeGroup;
         }
+        return properties.get(CLOUD_CLUSTER);

Review Comment:
   `compute_group` / `cloud_cluster` values come from 
`DorisConfig.getSinkProperties()` without trimming. Returning the raw value can 
include leading/trailing whitespace, which will later cause compute-group 
matching to fail (and can route stream load to the wrong backend set). Consider 
trimming and treating all-whitespace values as unset.



##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java:
##########
@@ -378,6 +391,184 @@ public List<Backend> getAliveBackends() throws Exception {
         });
     }
 
+    private List<Backend> getManagerBackends(String computeGroupName) throws 
Exception {
+        return requestFrontends((frontend, client) -> {
+            String url = URLs.managerBackends(frontend.getHost(), 
frontend.getHttpPort(), isHttpsEnabled);
+            HttpGet httpGet = new HttpGet(url);
+            HttpUtils.setAuth(httpGet, username, password);
+            try (CloseableHttpResponse response = client.execute(httpGet)) {
+                if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
+                    throw new RuntimeException("request fe with url: [" + url 
+ "] failed with http code: "
+                            + response.getStatusLine().getStatusCode() + ", 
reason: "
+                            + response.getStatusLine().getReasonPhrase());
+                }
+                String entity = EntityUtils.toString(response.getEntity());
+                List<Backend> backends = parseManagerBackends(entity, 
computeGroupName);
+                Collections.shuffle(backends);
+                return backends;
+            } catch (IOException e) {
+                throw new RuntimeException("get manager backends failed", e);
+            }
+        });
+    }
+
+    static List<Backend> parseManagerBackends(String response, String 
computeGroupName) {
+        if (StringUtils.isBlank(computeGroupName)) {
+            throw managerBackendsException(computeGroupName, "compute group is 
empty");
+        }
+
+        JsonNode rootNode;
+        try {
+            rootNode = MAPPER.readTree(response);
+        } catch (IOException e) {
+            throw managerBackendsException(computeGroupName, "Parse Doris 
manager backend response to json failed. res: " + response);
+        }

Review Comment:
   When JSON parsing fails, the thrown exception drops the original 
`IOException` cause, which makes debugging malformed/permission-denied 
responses harder (stack trace is lost). Wrap the generated message and keep the 
original cause.



##########
spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java:
##########
@@ -378,6 +391,184 @@ public List<Backend> getAliveBackends() throws Exception {
         });
     }
 
+    private List<Backend> getManagerBackends(String computeGroupName) throws 
Exception {
+        return requestFrontends((frontend, client) -> {
+            String url = URLs.managerBackends(frontend.getHost(), 
frontend.getHttpPort(), isHttpsEnabled);
+            HttpGet httpGet = new HttpGet(url);
+            HttpUtils.setAuth(httpGet, username, password);
+            try (CloseableHttpResponse response = client.execute(httpGet)) {
+                if (response.getStatusLine().getStatusCode() != 
HttpStatus.SC_OK) {
+                    throw new RuntimeException("request fe with url: [" + url 
+ "] failed with http code: "
+                            + response.getStatusLine().getStatusCode() + ", 
reason: "
+                            + response.getStatusLine().getReasonPhrase());
+                }
+                String entity = EntityUtils.toString(response.getEntity());
+                List<Backend> backends = parseManagerBackends(entity, 
computeGroupName);
+                Collections.shuffle(backends);
+                return backends;
+            } catch (IOException e) {
+                throw new RuntimeException("get manager backends failed", e);
+            }
+        });
+    }
+
+    static List<Backend> parseManagerBackends(String response, String 
computeGroupName) {
+        if (StringUtils.isBlank(computeGroupName)) {
+            throw managerBackendsException(computeGroupName, "compute group is 
empty");
+        }
+
+        JsonNode rootNode;
+        try {
+            rootNode = MAPPER.readTree(response);
+        } catch (IOException e) {
+            throw managerBackendsException(computeGroupName, "Parse Doris 
manager backend response to json failed. res: " + response);
+        }
+
+        JsonNode dataNode = unwrapManagerBackendData(rootNode, 
computeGroupName);
+        JsonNode columnNode = dataNode.path("columnNames");
+        if (!columnNode.isArray()) {
+            columnNode = dataNode.path("column_names");
+        }
+        JsonNode rowNode = dataNode.path("rows");
+        if (!columnNode.isArray() || !rowNode.isArray()) {
+            throw managerBackendsException(computeGroupName, "response does 
not contain columnNames/column_names and rows");
+        }
+
+        Map<String, Integer> columnIndexes = getColumnIndexes(columnNode, 
computeGroupName);
+        int hostIndex = requireColumn(columnIndexes, "Host", computeGroupName);
+        int httpPortIndex = requireColumn(columnIndexes, "HttpPort", 
computeGroupName);
+        int aliveIndex = requireColumn(columnIndexes, "Alive", 
computeGroupName);
+        int tagIndex = requireColumn(columnIndexes, "Tag", computeGroupName);
+
+        List<Backend> backends = new ArrayList<>();
+        for (JsonNode row : rowNode) {
+            if (!row.isArray()) {
+                throw managerBackendsException(computeGroupName, "backend row 
is not an array");
+            }
+            if (!Boolean.parseBoolean(getManagerBackendCell(row, aliveIndex))) 
{
+                continue;
+            }
+            String rowComputeGroupName = 
getComputeGroupNameFromTag(getManagerBackendCell(row, tagIndex));
+            if (!computeGroupName.equals(rowComputeGroupName)) {
+                continue;
+            }
+            String httpPort = getManagerBackendCell(row, httpPortIndex);
+            try {
+                backends.add(new Backend(getManagerBackendCell(row, 
hostIndex), Integer.parseInt(httpPort), -1));
+            } catch (NumberFormatException e) {
+                throw managerBackendsException(computeGroupName, "backend 
HttpPort is invalid: " + httpPort);
+            }
+        }
+
+        if (backends.isEmpty()) {
+            throw managerBackendsException(computeGroupName,
+                    "no alive backend found. If the target is a virtual 
compute group, configure its physical active compute group");
+        }
+        return backends;
+    }
+
+    private static JsonNode unwrapManagerBackendData(JsonNode rootNode, String 
computeGroupName) {
+        if (rootNode.has("code") && rootNode.has("msg")) {
+            if (!"0".equalsIgnoreCase(rootNode.path("code").asText())) {
+                throw managerBackendsException(computeGroupName,
+                        rootNode.path("msg").asText() + ": " + 
rootNode.path("data").asText());

Review Comment:
   For non-success manager responses, `rootNode.path("data").asText()` returns 
an empty string when `data` is an object/array, which can hide useful error 
details in the exception message. Using `toString()` preserves structured 
content.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to