This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7dce3f87f9 [INLONG-8786][Sort] Fix doris schema change in case of
multiple uri (#8787)
7dce3f87f9 is described below
commit 7dce3f87f9293c9da326d14fa89fbf614cedcd93
Author: Yizhou Yang <[email protected]>
AuthorDate: Mon Aug 28 10:56:12 2023 +0800
[INLONG-8786][Sort] Fix doris schema change in case of multiple uri (#8787)
Co-authored-by: Yizhou Yang <[email protected]>
---
.../sort/doris/schema/SchemaChangeHelper.java | 24 ++++++++++++++++------
1 file changed, 18 insertions(+), 6 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
index effae22e1d..0eec4921c4 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/schema/SchemaChangeHelper.java
@@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -59,6 +60,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.StringJoiner;
+import java.util.stream.Collectors;
/**
* Schema change helper
@@ -415,12 +417,22 @@ public class SchemaChangeHelper {
private boolean executeStatement(String database, String stmt) throws
IOException {
Map<String, String> param = new HashMap<>();
param.put("stmt", stmt);
- String requestUrl = String.format(SCHEMA_CHANGE_API,
options.getFenodes(), database);
- HttpPost httpPost = new HttpPost(requestUrl);
- httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
- httpPost.setHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON);
- httpPost.setEntity(new
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
- return sendRequest(httpPost);
+ List<String> fenodes = Arrays.asList(options.getFenodes().split(","));
+ List<String> uris = fenodes.stream()
+ .map(fenode -> String.format(SCHEMA_CHANGE_API, fenode,
database))
+ .collect(Collectors.toList());
+
+ for (String requestUrl : uris) {
+ HttpPost httpPost = new HttpPost(requestUrl);
+ httpPost.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
+ httpPost.setHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE_JSON);
+ httpPost.setEntity(new
StringEntity(dynamicSchemaFormat.objectMapper.writeValueAsString(param)));
+ // if any fenode succeeds, return true, else keep trying
+ if (sendRequest(httpPost)) {
+ return true;
+ }
+ }
+ return false;
}
private boolean checkLightSchemaChange(String database, String table,
String column, boolean dropColumn)