This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 74585c3 [improve] add code check for fe request (#309)
74585c3 is described below
commit 74585c3b6d123bf6b77e427211754822d3cbcf7b
Author: LeiWang <[email protected]>
AuthorDate: Thu Apr 24 15:55:40 2025 +0800
[improve] add code check for fe request (#309)
---
.../doris/spark/client/DorisFrontendClient.java | 57 +++++++++++-----------
1 file changed, 29 insertions(+), 28 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index cbc1352..cefb063 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -29,7 +29,6 @@ import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.util.HttpUtils;
import org.apache.doris.spark.util.URLs;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
@@ -103,20 +102,17 @@ public class DorisFrontendClient implements Serializable {
for (String frontendNode : frontendNodeArray) {
String[] nodeDetails = frontendNode.split(":");
try {
- List<Frontend> list = Collections.singletonList(new
Frontend(nodeDetails[0], nodeDetails.length > 1 ?
Integer.parseInt(nodeDetails[1]) : -1));
+ List<Frontend> list = Collections.singletonList(new
Frontend(nodeDetails[0],
+ nodeDetails.length > 1 ?
Integer.parseInt(nodeDetails[1]) : -1));
frontendList = requestFrontends(list, (frontend, client)
-> {
- HttpGet httpGet = new
HttpGet(URLs.getFrontEndNodes(frontend.getHost(), frontend.getHttpPort(),
isHttpsEnabled));
+ String url = URLs.getFrontEndNodes(frontend.getHost(),
frontend.getHttpPort(),
+ isHttpsEnabled);
+ HttpGet httpGet = new HttpGet(url);
HttpUtils.setAuth(httpGet, username, password);
JsonNode dataNode;
try {
HttpResponse response = client.execute(httpGet);
- if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
- throw new RuntimeException("fetch fe request
failed, status: "
- +
response.getStatusLine().getStatusCode()
- + ", reason: " +
response.getStatusLine().getReasonPhrase());
- }
- String entity =
EntityUtils.toString(response.getEntity());
- dataNode = extractEntity(entity, "data");
+ dataNode = extractDataFromResponse(response, url);
} catch (IOException e) {
throw new RuntimeException("fetch fe failed", e);
}
@@ -250,17 +246,14 @@ public class DorisFrontendClient implements Serializable {
public Schema getTableSchema(String db, String table) throws Exception {
return requestFrontends((frontend, httpClient) -> {
- HttpGet httpGet = new HttpGet(URLs.tableSchema(frontend.getHost(),
frontend.getHttpPort(), db, table, isHttpsEnabled));
+ String url = URLs.tableSchema(frontend.getHost(),
frontend.getHttpPort(), db, table, isHttpsEnabled);
+ HttpGet httpGet = new HttpGet(url);
HttpUtils.setAuth(httpGet, username, password);
Schema dorisSchema;
try {
HttpResponse response = httpClient.execute(httpGet);
- if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
- throw new RuntimeException("table schema request failed,
code: " + response.getStatusLine().getStatusCode()
- + ", reason: " +
response.getStatusLine().getReasonPhrase());
- }
- String entity = EntityUtils.toString(response.getEntity());
- dorisSchema = MAPPER.readValue(extractEntity(entity,
"data").traverse(), Schema.class);
+ JsonNode dataNode = extractDataFromResponse(response, url);
+ dorisSchema = MAPPER.readValue(dataNode.traverse(),
Schema.class);
} catch (IOException e) {
throw new RuntimeException("table schema request failed", e);
}
@@ -311,7 +304,8 @@ public class DorisFrontendClient implements Serializable {
public QueryPlan getQueryPlan(String database, String table, String sql)
throws Exception {
return requestFrontends((frontend, httpClient) -> {
try {
- HttpPost httpPost = new
HttpPost(URLs.queryPlan(frontend.getHost(), frontend.getHttpPort(), database,
table, isHttpsEnabled));
+ String url = URLs.queryPlan(frontend.getHost(),
frontend.getHttpPort(), database, table, isHttpsEnabled);
+ HttpPost httpPost = new HttpPost(url);
HttpUtils.setAuth(httpPost, username, password);
String body = MAPPER.writeValueAsString(ImmutableMap.of("sql",
sql));
StringEntity stringEntity = new StringEntity(body,
StandardCharsets.UTF_8);
@@ -319,12 +313,7 @@ public class DorisFrontendClient implements Serializable {
stringEntity.setContentType("application/json");
httpPost.setEntity(stringEntity);
HttpResponse response = httpClient.execute(httpPost);
- if (response.getStatusLine().getStatusCode() !=
HttpStatus.SC_OK) {
- throw new DorisException("query plan request failed, code:
" + response.getStatusLine().getStatusCode()
- + ", reason: " +
response.getStatusLine().getReasonPhrase());
- }
- String entity = EntityUtils.toString(response.getEntity());
- JsonNode dataJsonNode = extractEntity(entity, "data");
+ JsonNode dataJsonNode = extractDataFromResponse(response, url);
if (dataJsonNode.get("exception") != null) {
throw new DorisException("query plan failed, exception: "
+ dataJsonNode.get("exception").asText());
}
@@ -335,8 +324,20 @@ public class DorisFrontendClient implements Serializable {
});
}
- private JsonNode extractEntity(String entityStr, String fieldName) throws
JsonProcessingException {
- return MAPPER.readTree(entityStr).get(fieldName);
+
+ private JsonNode extractDataFromResponse(HttpResponse response, String
url) throws IOException {
+ if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
+ throw new RuntimeException("request fe with url: [" + url + "]
failed with http code: "
+ + response.getStatusLine().getStatusCode() + ", reason: "
+ + response.getStatusLine().getReasonPhrase());
+ }
+ String entity = EntityUtils.toString(response.getEntity());
+ JsonNode respNode = MAPPER.readTree(entity);
+ String code = respNode.get("code").asText();
+ if (!"0".equalsIgnoreCase(code)) {
+ throw new RuntimeException("fetch fe url:[" + url + "] failed with
invalid msg code, response: " + entity);
+ }
+ return respNode.get("data");
}
public String[] getTableAllColumns(String db, String table) throws
Exception {
@@ -352,8 +353,8 @@ public class DorisFrontendClient implements Serializable {
ArrayNode backendsNode;
try {
CloseableHttpResponse res = client.execute(httpGet);
- String content = EntityUtils.toString(res.getEntity());
- backendsNode = (ArrayNode) extractEntity(content,
"data").get("backends");
+ JsonNode dataNode = extractDataFromResponse(res, url);
+ backendsNode = (ArrayNode) dataNode.get("backends");
} catch (IOException e) {
throw new RuntimeException("get alive backends failed", e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]