This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ca71e9f8a [INLONG-7767][Sort] Doris connector does not real delete
record because of columns header losing (#7768)
ca71e9f8a is described below
commit ca71e9f8a20fdedd721c6d49ceea8fcba605a250
Author: Liao Rui <[email protected]>
AuthorDate: Mon Apr 3 18:11:21 2023 +0800
[INLONG-7767][Sort] Doris connector does not real delete record because of
columns header losing (#7768)
---
.../inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index c056a92ee..db446f0ce 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -688,7 +688,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
try {
// support csv and json format
String format =
executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY, FORMAT_JSON_VALUE);
- loadValue = serialize(values, format);
+ loadValue = serialize(tableIdentifier, values, format);
respContent = load(tableIdentifier, loadValue);
try {
if (null != metricData && null != respContent) {
@@ -785,7 +785,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
* @return string
* @throws JsonProcessingException
*/
- private String serialize(List values, String format) throws
JsonProcessingException {
+ private String serialize(String tableIdentifier, List values, String
format) throws JsonProcessingException {
if (FORMAT_CSV_VALUE.equalsIgnoreCase(format)) {
LOG.info("doris data format: {}", format);
// set columns, and format json data to csv
@@ -822,6 +822,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
return csvData.toString();
} else {
+ // Dynamic set COLUMNS_KEY for tableIdentifier every time for
multiple sink scenario
+ if (multipleSink) {
+ executionOptions.getStreamLoadProp().put(COLUMNS_KEY,
columnsMap.get(tableIdentifier));
+ }
return OBJECT_MAPPER.writeValueAsString(values);
}
}