This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dce3f87f9 [INLONG-8786][Sort] Fix doris schema change in case of 
multiple uri  (#8787)
7dce3f87f9 is described below

commit 7dce3f87f9293c9da326d14fa89fbf614cedcd93
Author: Yizhou Yang <[email protected]>
AuthorDate: Mon Aug 28 10:56:12 2023 +0800

    [INLONG-8786][Sort] Fix doris schema change in case of multiple uri  (#8787)
    
    Co-authored-by: Yizhou Yang <[email protected]>
---
 .../sort/doris/schema/SchemaChangeHelper.java      | 24 ++++++++++++++++------
 1 file changed, 18 insertions(+), 6 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
index effae22e1d..0eec4921c4 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
@@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -59,6 +60,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.StringJoiner;
+import java.util.stream.Collectors;
 
 /**
  * Schema change helper
@@ -415,12 +417,22 @@ public class SchemaChangeHelper {
     private boolean executeStatement(String database, String stmt) throws 
IOException {
         Map<String, String> param = new HashMap<>();
         param.put("stmt", stmt);
-        String requestUrl = String.format(SCHEMA_CHANGE_API, 
options.getFenodes(), database);
-        HttpPost httpPost = new HttpPost(requestUrl);
-        httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
-        httpPost.setHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON);
-        httpPost.setEntity(new 
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
-        return sendRequest(httpPost);
+        List<String> fenodes = Arrays.asList(options.getFenodes().split(","));
+        List<String> uris = fenodes.stream()
+                .map(fenode -> String.format(SCHEMA_CHANGE_API, fenode, 
database))
+                .collect(Collectors.toList());
+
+        for (String requestUrl : uris) {
+            HttpPost httpPost = new HttpPost(requestUrl);
+            httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+            httpPost.setHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON);
+            httpPost.setEntity(new 
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
+            // if any fenode succeeds, return true, else keep trying
+            if (sendRequest(httpPost)) {
+                return true;
+            }
+        }
+        return false;
     }
 
     private boolean checkLightSchemaChange(String database, String table, 
String column, boolean dropColumn)

Reply via email to