This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8ba89c44 [fix]Fix the error of missing content returned by schema
change response (#433)
8ba89c44 is described below
commit 8ba89c4488bb7b639566b519da58c698f1e13b2f
Author: wudongliang <[email protected]>
AuthorDate: Wed Jul 17 17:22:44 2024 +0800
[fix]Fix the error of missing content returned by schema change response
(#433)
---
.../doris/flink/sink/schema/SchemaChangeManager.java | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
index 27f2aece..c946bee7 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.sink.schema;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.StringUtils;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.codec.binary.Base64;
@@ -198,12 +199,12 @@ public class SchemaChangeManager implements Serializable {
httpGet.setHeader(HttpHeaders.AUTHORIZATION, authHeader());
httpGet.setEntity(
new StringEntity(objectMapper.writeValueAsString(params),
charsetEncoding));
- String responseEntity = "";
- Map<String, Object> responseMap = handleResponse(httpGet,
responseEntity);
- return handleSchemaChange(responseMap, responseEntity);
+ String responseEntity = handleResponse(httpGet);
+ return handleSchemaChange(responseEntity);
}
- private boolean handleSchemaChange(Map<String, Object> responseMap, String
responseEntity) {
+ private boolean handleSchemaChange(String responseEntity) throws
JsonProcessingException {
+ Map<String, Object> responseMap =
objectMapper.readValue(responseEntity, Map.class);
String code = responseMap.getOrDefault("code", "-1").toString();
if (code.equals("0")) {
return true;
@@ -221,9 +222,8 @@ public class SchemaChangeManager implements Serializable {
}
LOG.info("Execute SQL: {}", ddl);
HttpPost httpPost = buildHttpPost(ddl, database);
- String responseEntity = "";
- Map<String, Object> responseMap = handleResponse(httpPost,
responseEntity);
- return handleSchemaChange(responseMap, responseEntity);
+ String responseEntity = handleResponse(httpPost);
+ return handleSchemaChange(responseEntity);
}
public HttpPost buildHttpPost(String ddl, String database)
@@ -245,15 +245,13 @@ public class SchemaChangeManager implements Serializable {
return httpPost;
}
- private Map<String, Object> handleResponse(HttpUriRequest request, String
responseEntity) {
+ private String handleResponse(HttpUriRequest request) {
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
CloseableHttpResponse response = httpclient.execute(request);
final int statusCode = response.getStatusLine().getStatusCode();
final String reasonPhrase =
response.getStatusLine().getReasonPhrase();
if (statusCode == 200 && response.getEntity() != null) {
- responseEntity = EntityUtils.toString(response.getEntity());
- Map<String, Object> responseMap =
objectMapper.readValue(responseEntity, Map.class);
- return responseMap;
+ return EntityUtils.toString(response.getEntity());
} else {
throw new DorisSchemaChangeException(
"Failed to schemaChange, status: "
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]