This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9c36c45819 [Feature][Connector-V2]Support StarRocks Fe Node HA
9c36c45819 is described below
commit 9c36c45819f4bde1a981103808f705301f66378c
Author: xiaochen <[email protected]>
AuthorDate: Mon May 6 21:28:26 2024 +0800
[Feature][Connector-V2]Support StarRocks Fe Node HA
---
.../source/StarRocksQueryPlanReadClient.java | 55 +++++++++++-----------
1 file changed, 27 insertions(+), 28 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java
index 315519424c..85458f7151 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java
@@ -38,7 +38,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
@Slf4j
@@ -64,8 +63,7 @@ public class StarRocksQueryPlanReadClient {
public List<QueryPartition> findPartitions() {
List<String> nodeUrls = sourceConfig.getNodeUrls();
- QueryPlan queryPlan =
- getQueryPlan(genQuerySql(), nodeUrls.get(new
Random().nextInt(nodeUrls.size())));
+ QueryPlan queryPlan = getQueryPlan(genQuerySql(), nodeUrls);
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan);
return tabletsMapToPartition(
be2Tablets,
@@ -136,37 +134,38 @@ public class StarRocksQueryPlanReadClient {
return beXTablets;
}
- private QueryPlan getQueryPlan(String querySQL, String httpNode) {
- String url =
- new StringBuilder("http://")
- .append(httpNode)
- .append("/api/")
- .append(sourceConfig.getDatabase())
- .append("/")
- .append(sourceConfig.getTable())
- .append("/_query_plan")
- .toString();
+ private QueryPlan getQueryPlan(String querySQL, List<String> nodeUrls) {
Map<String, Object> bodyMap = new HashMap<>();
bodyMap.put("sql", querySQL);
String body = JsonUtils.toJsonString(bodyMap);
- String respString;
- try {
- respString =
- RetryUtils.retryWithException(
- () -> httpHelper.doHttpPost(url,
getQueryPlanHttpHeader(), body),
- retryMaterial);
- } catch (Exception e) {
- throw new StarRocksConnectorException(
- StarRocksConnectorErrorCode.QUEST_QUERY_PLAN_FAILED, e);
+ String respString = "";
+ for (String feNode : nodeUrls) {
+ String url =
+ new StringBuilder("http://")
+ .append(feNode)
+ .append("/api/")
+ .append(sourceConfig.getDatabase())
+ .append("/")
+ .append(sourceConfig.getTable())
+ .append("/_query_plan")
+ .toString();
+ try {
+ respString =
+ RetryUtils.retryWithException(
+ () -> httpHelper.doHttpPost(url,
getQueryPlanHttpHeader(), body),
+ retryMaterial);
+ if (StringUtils.isNoneEmpty(respString)) {
+ return JsonUtils.parseObject(respString, QueryPlan.class);
+ }
+ } catch (Exception e) {
+ log.error("Request query Plan From {} failed: {}", feNode,
e.getMessage());
+ }
}
- if (StringUtils.isEmpty(respString)) {
- throw new StarRocksConnectorException(
- StarRocksConnectorErrorCode.QUEST_QUERY_PLAN_FAILED,
- "query failed with empty response");
- }
- return JsonUtils.parseObject(respString, QueryPlan.class);
+ throw new StarRocksConnectorException(
+ StarRocksConnectorErrorCode.QUEST_QUERY_PLAN_FAILED,
+ "query failed with empty response");
}
private String getBasicAuthHeader(String username, String password) {