Copilot commented on code in PR #655:
URL:
https://github.com/apache/doris-flink-connector/pull/655#discussion_r3216113237
##########
flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java:
##########
@@ -389,6 +444,184 @@ static List<BackendRowV2> parseBackendV2(String response,
Logger logger) {
return backendRows;
}
+ @VisibleForTesting
+ static List<BackendRowV2> parseManagerBackends(
+ String response, Logger logger, String computeGroupName) {
+ if (StringUtils.isBlank(computeGroupName)) {
+ throw managerBackendsException(computeGroupName, "compute group is
empty");
+ }
+
+ JsonNode rootNode = parseJsonResponse(response, computeGroupName,
logger);
+ JsonNode dataNode = unwrapManagerBackendData(rootNode,
computeGroupName);
+ JsonNode columnNode = dataNode.path("columnNames");
+ if (!columnNode.isArray()) {
+ columnNode = dataNode.path("column_names");
+ }
+ JsonNode rowNode = dataNode.path("rows");
+ if (!columnNode.isArray() || !rowNode.isArray()) {
+ throw managerBackendsException(
+ computeGroupName,
+ "response does not contain columnNames/column_names and
rows");
+ }
+
+ Map<String, Integer> columnIndexes = getColumnIndexes(columnNode,
computeGroupName);
+ int hostIndex = requireColumn(columnIndexes, "Host", computeGroupName);
+ int httpPortIndex = requireColumn(columnIndexes, "HttpPort",
computeGroupName);
+ int aliveIndex = requireColumn(columnIndexes, "Alive",
computeGroupName);
+ int tagIndex = requireColumn(columnIndexes, "Tag", computeGroupName);
+
+ List<BackendRowV2> backends = new ArrayList<>();
+ for (JsonNode row : rowNode) {
+ if (!row.isArray()) {
+ throw managerBackendsException(computeGroupName, "backend row
is not an array");
+ }
+ if (!Boolean.parseBoolean(getManagerBackendCell(row, aliveIndex)))
{
+ continue;
+ }
+ String tag = getManagerBackendCell(row, tagIndex);
+ String rowComputeGroupName = getComputeGroupNameFromTag(tag);
+ if (!computeGroupName.equals(rowComputeGroupName)) {
+ continue;
+ }
+
+ String host = getManagerBackendCell(row, hostIndex);
+ String httpPort = getManagerBackendCell(row, httpPortIndex);
+ try {
+ BackendRowV2 backend = BackendRowV2.of(host,
Integer.parseInt(httpPort), true);
+ backends.add(backend);
+ } catch (NumberFormatException e) {
+ throw managerBackendsException(
+ computeGroupName, "backend HttpPort is invalid: " +
httpPort);
+ }
+ }
+
+ if (backends.isEmpty()) {
+ throw managerBackendsException(
+ computeGroupName,
+ "no alive backend found. If the target is a virtual
compute group, configure its physical active compute group");
+ }
+ logger.debug("Parsing manager backend result is '{}'.", backends);
+ return backends;
+ }
+
+ private static JsonNode parseJsonResponse(
+ String response, String computeGroupName, Logger logger) {
+ try {
+ return objectMapper.readTree(response);
+ } catch (IOException e) {
+ String errMsg = "Parse Doris manager backend response to json
failed. res: " + response;
+ logger.error(errMsg, e);
+ throw managerBackendsException(computeGroupName, errMsg);
+ }
+ }
+
+ private static JsonNode unwrapManagerBackendData(JsonNode rootNode, String
computeGroupName) {
+ if (rootNode.has("code") && rootNode.has("msg")) {
+ if (rootNode.path("code").asInt() != REST_RESPONSE_CODE_OK) {
+ throw managerBackendsException(
+ computeGroupName,
+ rootNode.path("msg").asText() + ": " +
rootNode.path("data").asText());
+ }
+ return rootNode.path("data");
Review Comment:
In `unwrapManagerBackendData`, the error path builds `msg + ": " +
rootNode.path("data").asText()`. If `data` is an object/array node (not a JSON
string), `asText()` will return an empty string, which can hide useful error
details. Consider using `data.toString()` (or otherwise serializing `data`)
when it isn't textual so the exception message remains informative.
##########
flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java:
##########
@@ -86,10 +84,7 @@ public DorisCommitter(
this.ignoreCommitError = executionOptions.ignoreCommitError();
this.httpClient = client;
this.backendUtil =
- StringUtils.isNotEmpty(dorisOptions.getBenodes())
- ? new BackendUtil(dorisOptions.getBenodes())
- : new BackendUtil(
- RestService.getBackendsV2(dorisOptions,
dorisReadOptions, LOG));
+ BackendUtil.getInstance(dorisOptions, dorisReadOptions,
executionOptions, LOG);
Review Comment:
`Preconditions.checkArgument(maxRetry >= 0)` is validating the field default
(0) instead of the configured value from `executionOptions.getMaxRetries()`.
This allows negative max retries to slip through, which can cause
`commitTransaction` to skip the retry loop entirely and silently not commit.
Validate `executionOptions.getMaxRetries()` (or assign `this.maxRetry` first
and validate that) instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]