This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9c36c45819 [Feature][Connector-V2]Support StarRocks Fe Node HA
9c36c45819 is described below

commit 9c36c45819f4bde1a981103808f705301f66378c
Author: xiaochen <[email protected]>
AuthorDate: Mon May 6 21:28:26 2024 +0800

    [Feature][Connector-V2]Support StarRocks Fe Node HA
---
 .../source/StarRocksQueryPlanReadClient.java       | 55 +++++++++++-----------
 1 file changed, 27 insertions(+), 28 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java
index 315519424c..85458f7151 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksQueryPlanReadClient.java
@@ -38,7 +38,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.Set;
 
 @Slf4j
@@ -64,8 +63,7 @@ public class StarRocksQueryPlanReadClient {
 
     public List<QueryPartition> findPartitions() {
         List<String> nodeUrls = sourceConfig.getNodeUrls();
-        QueryPlan queryPlan =
-                getQueryPlan(genQuerySql(), nodeUrls.get(new 
Random().nextInt(nodeUrls.size())));
+        QueryPlan queryPlan = getQueryPlan(genQuerySql(), nodeUrls);
         Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan);
         return tabletsMapToPartition(
                 be2Tablets,
@@ -136,37 +134,38 @@ public class StarRocksQueryPlanReadClient {
         return beXTablets;
     }
 
-    private QueryPlan getQueryPlan(String querySQL, String httpNode) {
-        String url =
-                new StringBuilder("http://";)
-                        .append(httpNode)
-                        .append("/api/")
-                        .append(sourceConfig.getDatabase())
-                        .append("/")
-                        .append(sourceConfig.getTable())
-                        .append("/_query_plan")
-                        .toString();
+    private QueryPlan getQueryPlan(String querySQL, List<String> nodeUrls) {
 
         Map<String, Object> bodyMap = new HashMap<>();
         bodyMap.put("sql", querySQL);
         String body = JsonUtils.toJsonString(bodyMap);
-        String respString;
-        try {
-            respString =
-                    RetryUtils.retryWithException(
-                            () -> httpHelper.doHttpPost(url, 
getQueryPlanHttpHeader(), body),
-                            retryMaterial);
-        } catch (Exception e) {
-            throw new StarRocksConnectorException(
-                    StarRocksConnectorErrorCode.QUEST_QUERY_PLAN_FAILED, e);
+        String respString = "";
+        for (String feNode : nodeUrls) {
+            String url =
+                    new StringBuilder("http://";)
+                            .append(feNode)
+                            .append("/api/")
+                            .append(sourceConfig.getDatabase())
+                            .append("/")
+                            .append(sourceConfig.getTable())
+                            .append("/_query_plan")
+                            .toString();
+            try {
+                respString =
+                        RetryUtils.retryWithException(
+                                () -> httpHelper.doHttpPost(url, 
getQueryPlanHttpHeader(), body),
+                                retryMaterial);
+                if (StringUtils.isNoneEmpty(respString)) {
+                    return JsonUtils.parseObject(respString, QueryPlan.class);
+                }
+            } catch (Exception e) {
+                log.error("Request query Plan From {} failed: {}", feNode, 
e.getMessage());
+            }
         }
 
-        if (StringUtils.isEmpty(respString)) {
-            throw new StarRocksConnectorException(
-                    StarRocksConnectorErrorCode.QUEST_QUERY_PLAN_FAILED,
-                    "query failed with empty response");
-        }
-        return JsonUtils.parseObject(respString, QueryPlan.class);
+        throw new StarRocksConnectorException(
+                StarRocksConnectorErrorCode.QUEST_QUERY_PLAN_FAILED,
+                "query failed with empty response");
     }
 
     private String getBasicAuthHeader(String username, String password) {

Reply via email to