This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0039d3ed9 [INLONG-7584][Sort] Doris connector supports writing CSV and 
archiving dirty data (#7585)
0039d3ed9 is described below

commit 0039d3ed92fd12d5d04e7d4472af69d18cfa7337
Author: Liao Rui <[email protected]>
AuthorDate: Wed Mar 15 16:13:51 2023 +0800

    [INLONG-7584][Sort] Doris connector supports writing CSV and archiving 
dirty data (#7585)
    
    Co-authored-by: ryanrliao <[email protected]>
---
 .../table/DorisDynamicSchemaOutputFormat.java      | 380 ++++++++++++++-------
 .../sort/doris/table/DorisDynamicTableFactory.java |  22 +-
 .../sort/doris/table/DorisDynamicTableSink.java    |  23 +-
 .../inlong/sort/doris/table/DorisStreamLoad.java   |   5 +-
 4 files changed, 304 insertions(+), 126 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 2ee979d15..c056a92ee 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.sort.doris.table;
 
+import java.util.LinkedHashSet;
+import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
@@ -25,7 +27,6 @@ import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.rest.models.Schema;
-import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -35,15 +36,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.dirty.DirtyData;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
 import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
@@ -51,6 +54,7 @@ import 
org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.doris.model.RespContent;
 import org.apache.inlong.sort.doris.util.DorisParseUtils;
@@ -94,6 +98,11 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final String COLUMNS_KEY = "columns";
     private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
+    private static final String DIRTY_LOG_TAG = "__DIRTY_LOG_TAG__";
+    private static final String DIRTY_LABEL = "__DIRTY_LABEL__";
+    private static final String DIRTY_IDENTIFIER = "__DIRTY_IDENTIFIER__";
+    private static final String DATABASE = "__DATABASE__";
+    private static final String TABLE = "__TABLE__";
     /**
      * Mark the record for delete
      */
@@ -102,6 +111,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
      * Mark the record for not delete
      */
     private static final String DORIS_DELETE_FALSE = "0";
+    private static final String FORMAT_JSON_VALUE = "json";
+    private static final String FORMAT_CSV_VALUE = "csv";
+    private static final String FORMAT_KEY = "format";
     private static final String FIELD_DELIMITER_KEY = "column_separator";
     private static final String FIELD_DELIMITER_DEFAULT = "\t";
     private static final String LINE_DELIMITER_KEY = "line_delimiter";
@@ -113,6 +125,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
     @SuppressWarnings({"rawtypes"})
     private final Map<String, List> batchMap = new HashMap<>();
     private final Map<String, String> columnsMap = new HashMap<>();
+    /**
+     * data will not be submitted when table is in errorTables list
+     */
     private final List<String> errorTables = new ArrayList<>();
     private final DorisOptions options;
     private final DorisReadOptions readOptions;
@@ -130,6 +145,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
     private final String tablePattern;
     private final String dynamicSchemaFormat;
     private final boolean ignoreSingleTableErrors;
+    private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
     private long batchBytes = 0L;
     private int size;
     private DorisStreamLoad dorisStreamLoad;
@@ -146,9 +162,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
     private volatile RowData.FieldGetter[] fieldGetters;
     private String fieldDelimiter;
     private String lineDelimiter;
+    private String columns;
     private final LogicalType[] logicalTypes;
-    private final DirtyOptions dirtyOptions;
-    private @Nullable final DirtySink<Object> dirtySink;
+    private DirtySinkHelper<Object> dirtySinkHelper;
     private transient Schema schema;
 
     public DorisDynamicSchemaOutputFormat(DorisOptions option,
@@ -161,6 +177,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             String databasePattern,
             String tablePattern,
             boolean ignoreSingleTableErrors,
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy,
             String inlongMetric,
             String auditHostAndPorts,
             boolean multipleSink,
@@ -179,8 +196,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
         this.ignoreSingleTableErrors = ignoreSingleTableErrors;
-        this.dirtyOptions = dirtyOptions;
-        this.dirtySink = dirtySink;
+        this.schemaUpdatePolicy = schemaUpdatePolicy;
+        this.dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+
+        handleStreamLoadProp();
     }
 
     /**
@@ -203,18 +222,23 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             props.remove(ESCAPE_DELIMITERS_KEY);
         }
 
-        // add column key when fieldNames is not empty
-        if (!props.containsKey(COLUMNS_KEY) && fieldNames != null && 
fieldNames.length > 0) {
-            String columns = Arrays.stream(fieldNames)
-                    .map(item -> String.format("`%s`", 
item.trim().replace("`", "")))
-                    .collect(Collectors.joining(","));
-            props.put(COLUMNS_KEY, columns);
-        }
+        // save `sink.properties.columns` parameter from options
+        this.columns = (String) props.get(COLUMNS_KEY);
+        if (!multipleSink) {
+            // add column key when fieldNames is not empty
+            if (!props.containsKey(COLUMNS_KEY) && fieldNames != null && 
fieldNames.length > 0) {
+                String columns =
+                        Arrays.stream(fieldNames).map(item -> 
String.format("`%s`", item.trim().replace("`", "")))
+                                .collect(Collectors.joining(","));
+                props.put(COLUMNS_KEY, columns);
+            }
 
-        // if enable batch delete, the columns must add tag 
'__DORIS_DELETE_SIGN__'
-        String columns = (String) props.get(COLUMNS_KEY);
-        if (!columns.contains(DORIS_DELETE_SIGN) && enableBatchDelete()) {
-            props.put(COLUMNS_KEY, String.format("%s,%s", columns, 
DORIS_DELETE_SIGN));
+            // if enable batch delete, the columns must add tag 
'__DORIS_DELETE_SIGN__'
+            String columns = (String) props.get(COLUMNS_KEY);
+            if (columns != null && !columns.contains(DORIS_DELETE_SIGN) && 
enableBatchDelete()) {
+                columns = String.format("%s,%s", columns, DORIS_DELETE_SIGN);
+                props.put(COLUMNS_KEY, columns);
+            }
         }
     }
 
@@ -235,7 +259,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
         dorisStreamLoad = new DorisStreamLoad(getBackend(), 
options.getUsername(), options.getPassword(), loadProps);
         if (!multipleSink) {
             this.jsonFormat = true;
-            handleStreamLoadProp();
+            // handleStreamLoadProp();
             this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
             for (int i = 0; i < logicalTypes.length; i++) {
                 fieldGetters[i] = 
DorisParseUtils.createFieldGetter(logicalTypes[i], i);
@@ -266,13 +290,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
                 metricData.registerSubMetricsGroup(metricState);
             }
         }
-        if (dirtySink != null) {
-            try {
-                dirtySink.open(new Configuration());
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-        }
+        dirtySinkHelper.open(new Configuration());
         if (executionOptions.getBatchIntervalMs() != 0 && 
executionOptions.getBatchSize() != 1) {
             this.scheduler = new ScheduledThreadPoolExecutor(1,
                     new 
ExecutorThreadFactory("doris-streamload-output-format"));
@@ -286,10 +304,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
 
     private boolean checkFlushException(String tableIdentifier) {
         Exception ex = flushExceptionMap.get(tableIdentifier);
-        if (!multipleSink || ex == null) {
+        if (!multipleSink || ex == null || 
SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == schemaUpdatePolicy) {
             return false;
         }
-        if (!ignoreSingleTableErrors) {
+        if (SchemaUpdateExceptionPolicy.THROW_WITH_STOP == schemaUpdatePolicy) 
{
             throw new RuntimeException("Writing records to streamload failed, 
tableIdentifier=" + tableIdentifier, ex);
         }
         return true;
@@ -340,7 +358,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
                 batchMap.putIfAbsent(tableIdentifier, mapData);
             } catch (Exception e) {
                 LOG.error(String.format("serialize error, raw data: %s", row), 
e);
-                handleDirtyData(row, DirtyType.SERIALIZE_ERROR, e);
+                if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
schemaUpdatePolicy) {
+                    handleDirtyData(row, DirtyType.SERIALIZE_ERROR, e);
+                }
             }
         } else if (row instanceof String) {
             batchBytes += ((String) 
row).getBytes(StandardCharsets.UTF_8).length;
@@ -349,8 +369,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             batchMap.putIfAbsent(tableIdentifier, mapData);
         } else {
             LOG.error(String.format("The type of element should be 'RowData' 
or 'String' only., raw data: %s", row));
-            handleDirtyData(row, DirtyType.UNSUPPORTED_DATA_TYPE,
-                    new RuntimeException("The type of element should be 
'RowData' or 'String' only."));
+            if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
schemaUpdatePolicy) {
+                handleDirtyData(row, DirtyType.UNSUPPORTED_DATA_TYPE,
+                        new RuntimeException("The type of element should be 
'RowData' or 'String' only."));
+            }
         }
 
     }
@@ -368,7 +390,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
                 rootNode = 
jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
             } catch (Exception e) {
                 LOG.error(String.format("deserialize error, raw data: %s", new 
String(rowData.getBinary(0))), e);
-                handleDirtyData(new String(rowData.getBinary(0)), 
DirtyType.DESERIALIZE_ERROR, e);
+                if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
schemaUpdatePolicy) {
+                    handleDirtyData(new String(rowData.getBinary(0)), 
DirtyType.DESERIALIZE_ERROR, e);
+                }
                 return;
             }
             boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
@@ -401,7 +425,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
                 }
             } catch (Exception e) {
                 LOG.error(String.format("json parse error, raw data: %s", new 
String(rowData.getBinary(0))), e);
-                handleDirtyData(new String(rowData.getBinary(0)), 
DirtyType.JSON_PROCESS_ERROR, e);
+                if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
schemaUpdatePolicy) {
+                    handleDirtyData(new String(rowData.getBinary(0)), 
DirtyType.JSON_PROCESS_ERROR, e);
+                }
                 return;
             }
             for (int i = 0; i < physicalDataList.size(); i++) {
@@ -417,17 +443,26 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             }
         } else {
             LOG.error(String.format("The type of element should be 'RowData' 
only, raw data: %s", row));
-            handleDirtyData(row, DirtyType.UNSUPPORTED_DATA_TYPE,
-                    new RuntimeException("The type of element should be 
'RowData' only."));
+            if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
schemaUpdatePolicy) {
+                handleDirtyData(row, DirtyType.UNSUPPORTED_DATA_TYPE,
+                        new RuntimeException("The type of element should be 
'RowData' only."));
+            }
         }
     }
 
     @SuppressWarnings({"unchecked"})
     private void addRow(RowKind rowKind, JsonNode rootNode, JsonNode 
physicalNode, JsonNode updateBeforeNode,
             Map<String, String> physicalData, Map<String, String> 
updateBeforeData) throws IOException {
-        String tableIdentifier = StringUtils.join(
-                jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".",
-                jsonDynamicSchemaFormat.parse(rootNode, tablePattern));
+        String database = jsonDynamicSchemaFormat.parse(rootNode, 
databasePattern);
+        String table = jsonDynamicSchemaFormat.parse(rootNode, tablePattern);
+        String tableIdentifier = StringUtils.join(database, ".", table);
+        if (dirtySinkHelper.getDirtySink() != null) {
+            try {
+                fillDirtySink(rootNode, physicalData, updateBeforeData, 
database, table);
+            } catch (Exception e) {
+                LOG.warn("fill dirty sink parameters failed");
+            }
+        }
         switch (rowKind) {
             case INSERT:
             case UPDATE_AFTER:
@@ -469,78 +504,82 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
         }
     }
 
+    private void fillDirtySink(JsonNode rootNode, Map<String, String> 
physicalData,
+            Map<String, String> updateBeforeData, String database, String 
table)
+            throws IOException {
+        DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
+        String dirtyLabel = null;
+        String dirtyLogTag = null;
+        String dirtyIdentifier = null;
+        if (dirtyOptions.ignoreDirty()) {
+            dirtyLabel = jsonDynamicSchemaFormat.parse(rootNode,
+                    DirtySinkHelper.regexReplace(dirtyOptions.getLabels(), 
DirtyType.BATCH_LOAD_ERROR, null));
+            dirtyLogTag = jsonDynamicSchemaFormat.parse(rootNode,
+                    DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(), 
DirtyType.BATCH_LOAD_ERROR, null));
+            dirtyIdentifier = jsonDynamicSchemaFormat.parse(rootNode,
+                    DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(), 
DirtyType.BATCH_LOAD_ERROR, null));
+        }
+        physicalData.put(DIRTY_LOG_TAG, dirtyLogTag);
+        physicalData.put(DIRTY_IDENTIFIER, dirtyIdentifier);
+        physicalData.put(DIRTY_LABEL, dirtyLabel);
+        physicalData.put(DATABASE, database);
+        physicalData.put(TABLE, table);
+        if (updateBeforeData != null) {
+            updateBeforeData.put(DIRTY_LOG_TAG, dirtyLogTag);
+            updateBeforeData.put(DIRTY_IDENTIFIER, dirtyIdentifier);
+            updateBeforeData.put(DIRTY_LABEL, dirtyLabel);
+            updateBeforeData.put(DATABASE, database);
+            updateBeforeData.put(TABLE, table);
+        }
+    }
+
     private void handleDirtyData(Object dirtyData, DirtyType dirtyType, 
Exception e) {
         errorNum.incrementAndGet();
-        if (!dirtyOptions.ignoreDirty()) {
-            RuntimeException ex;
-            if (e instanceof RuntimeException) {
-                ex = (RuntimeException) e;
-            } else {
-                ex = new RuntimeException(e);
-            }
-            throw ex;
-        }
 
         if (multipleSink) {
             if (dirtyType == DirtyType.DESERIALIZE_ERROR) {
                 LOG.error("database and table can't be identified, will use 
default ${database}${table}");
             } else {
-                handleMultipleDirtyData(dirtyData, dirtyType, e);
+                try {
+                    handleMultipleDirtyData(dirtyData, dirtyType, e);
+                } catch (Exception ex) {
+                    throw new RuntimeException(ex);
+                }
                 return;
             }
         }
 
-        if (dirtySink != null) {
-            DirtyData.Builder<Object> builder = DirtyData.builder();
-            try {
-                builder.setData(dirtyData)
-                        .setDirtyType(dirtyType)
-                        .setLabels(dirtyOptions.getLabels())
-                        .setLogTag(dirtyOptions.getLogTag())
-                        .setDirtyMessage(e.getMessage())
-                        .setIdentifier(dirtyOptions.getIdentifier());
-                dirtySink.invoke(builder.build());
-            } catch (Exception ex) {
-                if (!dirtyOptions.ignoreSideOutputErrors()) {
-                    throw new RuntimeException(ex);
-                }
-                LOG.warn("Dirty sink failed", ex);
-            }
+        DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
+        if (dirtyOptions.ignoreDirty()) {
+            dirtySinkHelper.invoke(dirtyData, dirtyType, 
dirtyOptions.getLabels(), dirtyOptions.getLogTag(),
+                    dirtyOptions.getIdentifier(), e);
         }
+
         metricData.invokeDirty(1, 
dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
     }
 
-    private void handleMultipleDirtyData(Object dirtyData, DirtyType 
dirtyType, Exception e) {
-        JsonNode rootNode;
-        try {
-            rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) 
dirtyData).getBinary(0));
-        } catch (Exception ex) {
-            handleDirtyData(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
-            return;
+    private void handleMultipleDirtyData(Object dirtyData, DirtyType 
dirtyType, Exception e)
+            throws JsonProcessingException {
+        Map<String, String> rawData;
+        if (dirtyData instanceof Map) {
+            rawData = (Map) dirtyData;
+        } else {
+            rawData = 
OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsString(dirtyData), Map.class);
         }
+        String label = rawData.remove(DIRTY_LABEL);
+        String logTag = rawData.remove(DIRTY_LOG_TAG);
+        String identifier = rawData.remove(DIRTY_IDENTIFIER);
+        String database = rawData.remove(DATABASE);
+        String table = rawData.remove(TABLE);
+        String content = OBJECT_MAPPER.writeValueAsString(rawData);
 
-        if (dirtySink != null) {
-            DirtyData.Builder<Object> builder = DirtyData.builder();
-            try {
-                builder.setData(dirtyData)
-                        .setDirtyType(dirtyType)
-                        .setLabels(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLabels()))
-                        .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLogTag()))
-                        .setDirtyMessage(e.getMessage())
-                        .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getIdentifier()));
-                dirtySink.invoke(builder.build());
-            } catch (Exception ex) {
-                if (!dirtyOptions.ignoreSideOutputErrors()) {
-                    throw new RuntimeException(ex);
-                }
-                LOG.warn("Dirty sink failed", ex);
-            }
+        if (dirtySinkHelper.getDirtyOptions().ignoreDirty()) {
+            dirtySinkHelper.invoke(OBJECT_MAPPER.readTree(content), dirtyType, 
label, logTag, identifier, e);
         }
+
         try {
-            metricData.outputDirtyMetricsWithEstimate(
-                    jsonDynamicSchemaFormat.parse(rootNode, databasePattern),
-                    jsonDynamicSchemaFormat.parse(rootNode, tablePattern), 1,
-                    ((RowData) dirtyData).getBinary(0).length);
+            metricData.outputDirtyMetricsWithEstimate(database, table, 1,
+                    content.getBytes(StandardCharsets.UTF_8).length);
         } catch (Exception ex) {
             metricData.invokeDirty(1, 
dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
         }
@@ -645,15 +684,17 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             return;
         }
         String loadValue = null;
-        RespContent respContent = null;
+        RespContent respContent;
         try {
-            loadValue = OBJECT_MAPPER.writeValueAsString(values);
+            // support csv and json format
+            String format = 
executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY, FORMAT_JSON_VALUE);
+            loadValue = serialize(values, format);
             respContent = load(tableIdentifier, loadValue);
             try {
                 if (null != metricData && null != respContent) {
                     if (multipleSink) {
                         String[] tableWithDb = tableIdentifier.split("\\.");
-                        metricData.outputMetrics(tableWithDb[0], null, 
tableWithDb[1],
+                        metricData.outputMetrics(tableWithDb[0], 
tableWithDb[1],
                                 respContent.getNumberLoadedRows(), 
respContent.getLoadBytes());
                     } else {
                         metricData.invoke(respContent.getNumberLoadedRows(), 
respContent.getLoadBytes());
@@ -667,34 +708,122 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             values.clear();
         } catch (Exception e) {
             LOG.error(String.format("Flush table: %s error", tableIdentifier), 
e);
-            // Makesure it is a dirty data
-            if (respContent == null || 
StringUtils.isNotBlank(respContent.getErrorURL())) {
-                flushExceptionMap.put(tableIdentifier, e);
-                errorNum.getAndAdd(values.size());
+            flushExceptionMap.put(tableIdentifier, e);
+            // may count repeatedly
+            errorNum.getAndAdd(values.size());
+
+            if (!multipleSink) {
+                try {
+                    handleSingleTable(e, values, loadValue);
+                    return;
+                } catch (Exception ex) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            if (SchemaUpdateExceptionPolicy.THROW_WITH_STOP == 
schemaUpdatePolicy) {
+                throw new RuntimeException(
+                        String.format("Writing records to streamload of 
tableIdentifier:%s failed, the value: %s.",
+                                tableIdentifier, loadValue),
+                        e);
+            }
+            if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == 
schemaUpdatePolicy) {
+                errorTables.add(tableIdentifier);
+                LOG.warn("The tableIdentifier: {} load failed and the data 
will be throw away in the future "
+                        + "because the option 
'sink.multiple.schema-update.policy' is 'STOP_PARTIAL'",
+                        tableIdentifier);
+                return;
+            }
+            if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
schemaUpdatePolicy) {
+                errorTables.add(tableIdentifier);
+                // archive dirty data when 
'sink.multiple.schema-update.policy' is 'LOG_WITH_IGNORE'
                 for (Object value : values) {
                     try {
                         
handleDirtyData(OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsString(value)),
                                 DirtyType.BATCH_LOAD_ERROR, e);
-                    } catch (IOException ex) {
-                        if (!dirtyOptions.ignoreSideOutputErrors()) {
+                    } catch (Exception ex) {
+                        if 
(!dirtySinkHelper.getDirtyOptions().ignoreSideOutputErrors()) {
                             throw new RuntimeException(ex);
                         }
                         LOG.warn("Dirty sink failed", ex);
                     }
                 }
-                if (!ignoreSingleTableErrors) {
-                    throw new RuntimeException(
-                            String.format("Writing records to streamload of 
tableIdentifier:%s failed, the value: %s.",
-                                    tableIdentifier, loadValue),
-                            e);
+            }
+
+            values.clear();
+        }
+    }
+
+    private void handleSingleTable(Exception e, List values, String loadValue) 
{
+        for (Object value : values) {
+            try {
+                
handleDirtyData(OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsString(value)),
+                        DirtyType.BATCH_LOAD_ERROR, e);
+            } catch (IOException ex) {
+                if 
(!dirtySinkHelper.getDirtyOptions().ignoreSideOutputErrors()) {
+                    throw new RuntimeException(ex);
                 }
-                errorTables.add(tableIdentifier);
-                LOG.warn("The tableIdentifier: {} load failed and the data 
will be throw away in the future"
-                        + " because the option 
'sink.multiple.ignore-single-table-errors' is 'true'", tableIdentifier);
-            } else {
-                throw new RuntimeException(e);
+                LOG.warn("Dirty sink failed", ex);
             }
         }
+        if (!ignoreSingleTableErrors) {
+            throw new RuntimeException(
+                    String.format("Writing records to streamload of 
tableIdentifier:%s failed, the value: %s.",
+                            tableIdentifier, loadValue),
+                    e);
+        }
+        errorTables.add(tableIdentifier);
+        LOG.warn("The tableIdentifier: {} load failed and the data will be 
throw away in the future"
+                + " because the option 
'sink.multiple.ignore-single-table-errors' is 'true'", tableIdentifier);
+    }
+
+    /**
+     * format data to csv or json
+     *
+     * @param values
+     * @param format
+     * @return string
+     * @throws JsonProcessingException
+     */
+    private String serialize(List values, String format) throws 
JsonProcessingException {
+        if (FORMAT_CSV_VALUE.equalsIgnoreCase(format)) {
+            LOG.info("doris data format: {}", format);
+            // set columns, and format json data to csv
+            String columns = null;
+            StringBuilder csvData = new StringBuilder();
+            for (Object item : values) {
+                if (item instanceof Map) {
+                    Map<String, String> map = (Map<String, String>) item;
+                    Set<String> fieldNameSet = new 
LinkedHashSet<>(map.keySet());
+                    if (columns == null) {
+                        // when single table synchronizing, parameter 
`sink.properties.columns` in options may
+                        // contain hll or bitmap function.
+                        // columns: dt,id,name,province,os, pv=hll_hash(id)
+                        if (this.columns != null) {
+                            for (String fieldName : this.columns.split(",")) {
+                                if (fieldName.contains("=")) {
+                                    fieldNameSet.add(fieldName);
+                                }
+                            }
+                        }
+                        columns = StringUtils.join(fieldNameSet, ",");
+                        executionOptions.getStreamLoadProp().put(COLUMNS_KEY, 
columns);
+                    }
+                    int idx = 0;
+                    int len = map.values().size();
+                    for (String val : map.values()) {
+                        csvData.append(null == val ? "\\N" : val);
+                        if (idx++ < len - 1) {
+                            csvData.append(this.fieldDelimiter);
+                        }
+                    }
+                    csvData.append(this.lineDelimiter);
+                }
+            }
+            return csvData.toString();
+        } else {
+            return OBJECT_MAPPER.writeValueAsString(values);
+        }
     }
 
     @SuppressWarnings("rawtypes")
@@ -715,10 +844,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
     private RespContent load(String tableIdentifier, String result) throws 
IOException {
         String[] tableWithDb = tableIdentifier.split("\\.");
         RespContent respContent = null;
-        // Dynamic set COLUMNS_KEY for tableIdentifier every time for multiple 
sink scenario
-        if (multipleSink) {
-            executionOptions.getStreamLoadProp().put(COLUMNS_KEY, 
columnsMap.get(tableIdentifier));
-        }
         for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
             try {
                 respContent = dorisStreamLoad.load(tableWithDb[0], 
tableWithDb[1], result);
@@ -784,6 +909,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
         private String databasePattern;
         private String tablePattern;
         private boolean ignoreSingleTableErrors;
+        private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
         private boolean multipleSink;
         private String inlongMetric;
         private String auditHostAndPorts;
@@ -837,8 +963,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             return this;
         }
 
-        public DorisDynamicSchemaOutputFormat.Builder setDynamicSchemaFormat(
-                String dynamicSchemaFormat) {
+        public DorisDynamicSchemaOutputFormat.Builder 
setDynamicSchemaFormat(String dynamicSchemaFormat) {
             this.dynamicSchemaFormat = dynamicSchemaFormat;
             return this;
         }
@@ -883,6 +1008,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             return this;
         }
 
+        public DorisDynamicSchemaOutputFormat.Builder setSchemaUpdatePolicy(
+                SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+            this.schemaUpdatePolicy = schemaUpdatePolicy;
+            return this;
+        }
+
         @SuppressWarnings({"rawtypes"})
         public DorisDynamicSchemaOutputFormat build() {
             LogicalType[] logicalTypes = null;
@@ -891,9 +1022,22 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
                         
.map(DataType::getLogicalType).toArray(LogicalType[]::new);
             }
             return new DorisDynamicSchemaOutputFormat(
-                    
optionsBuilder.setTableIdentifier(tableIdentifier).build(), readOptions, 
executionOptions,
-                    tableIdentifier, logicalTypes, fieldNames, 
dynamicSchemaFormat, databasePattern, tablePattern,
-                    ignoreSingleTableErrors, inlongMetric, auditHostAndPorts, 
multipleSink, dirtyOptions, dirtySink);
+                    optionsBuilder.setTableIdentifier(tableIdentifier).build(),
+                    readOptions,
+                    executionOptions,
+                    tableIdentifier,
+                    logicalTypes,
+                    fieldNames,
+                    dynamicSchemaFormat,
+                    databasePattern,
+                    tablePattern,
+                    ignoreSingleTableErrors,
+                    schemaUpdatePolicy,
+                    inlongMetric,
+                    auditHostAndPorts,
+                    multipleSink,
+                    dirtyOptions,
+                    dirtySink);
         }
     }
 }
diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index e5e482c29..7e0addaeb 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -45,6 +45,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
 import static 
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
@@ -63,6 +64,7 @@ import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTE
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS;
+import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 
 /**
@@ -147,7 +149,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
     private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = 
ConfigOptions
             .key("sink.batch.size")
             .intType()
-            .defaultValue(100)
+            .defaultValue(1024)
             .withDescription("the flush max size (includes all append, upsert 
and delete records), over this number"
                     + " of records, will flush data. The default value is 
100.");
     private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
@@ -215,6 +217,7 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         options.add(SINK_MULTIPLE_TABLE_PATTERN);
         options.add(SINK_MULTIPLE_ENABLE);
         options.add(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
+        options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
         options.add(INLONG_METRIC);
         options.add(INLONG_AUDIT);
         options.add(FactoryUtil.SINK_PARALLELISM);
@@ -302,6 +305,8 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
         String tablePattern = 
helper.getOptions().getOptional(SINK_MULTIPLE_TABLE_PATTERN).orElse(null);
         boolean multipleSink = helper.getOptions().get(SINK_MULTIPLE_ENABLE);
         boolean ignoreSingleTableErrors = 
helper.getOptions().get(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
+        SchemaUpdateExceptionPolicy schemaUpdatePolicy = helper.getOptions()
+                
.getOptional(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY).orElse(SchemaUpdateExceptionPolicy.THROW_WITH_STOP);
         String sinkMultipleFormat = 
helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
         validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
                 multipleSink, sinkMultipleFormat, databasePattern, 
tablePattern);
@@ -316,9 +321,18 @@ public final class DorisDynamicTableFactory implements 
DynamicTableSourceFactory
                 getDorisOptions(helper.getOptions()),
                 getDorisReadOptions(helper.getOptions()),
                 getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
-                physicalSchema, multipleSink, sinkMultipleFormat, 
databasePattern,
-                tablePattern, ignoreSingleTableErrors, inlongMetric, 
auditHostAndPorts, parallelism,
-                dirtyOptions, dirtySink);
+                physicalSchema,
+                multipleSink,
+                sinkMultipleFormat,
+                databasePattern,
+                tablePattern,
+                ignoreSingleTableErrors,
+                schemaUpdatePolicy,
+                inlongMetric,
+                auditHostAndPorts,
+                parallelism,
+                dirtyOptions,
+                dirtySink);
     }
 
     private void validateSinkMultiple(DataType physicalDataType, boolean 
multipleSink, String sinkMultipleFormat,
diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
index 8e6527c12..e04bd6704 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.doris.internal.GenericDorisSinkFunction;
 
 import javax.annotation.Nullable;
@@ -45,6 +46,7 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
     private final String databasePattern;
     private final String tablePattern;
     private final boolean ignoreSingleTableErrors;
+    private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
     private final String inlongMetric;
     private final String auditHostAndPorts;
     private final Integer parallelism;
@@ -60,6 +62,7 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
             String databasePattern,
             String tablePattern,
             boolean ignoreSingleTableErrors,
+            SchemaUpdateExceptionPolicy schemaUpdatePolicy,
             String inlongMetric,
             String auditHostAndPorts,
             Integer parallelism,
@@ -74,6 +77,7 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
         this.ignoreSingleTableErrors = ignoreSingleTableErrors;
+        this.schemaUpdatePolicy = schemaUpdatePolicy;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
         this.parallelism = parallelism;
@@ -107,6 +111,7 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
                 .setTablePattern(tablePattern)
                 .setDynamicSchemaFormat(sinkMultipleFormat)
                 .setIgnoreSingleTableErrors(ignoreSingleTableErrors)
+                .setSchemaUpdatePolicy(schemaUpdatePolicy)
                 .setDirtyOptions(dirtyOptions)
                 .setDirtySink(dirtySink);
         return SinkFunctionProvider.of(
@@ -115,9 +120,21 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
 
     @Override
     public DynamicTableSink copy() {
-        return new DorisDynamicTableSink(options, readOptions, 
executionOptions, tableSchema,
-                multipleSink, sinkMultipleFormat, databasePattern, 
tablePattern, ignoreSingleTableErrors,
-                inlongMetric, auditHostAndPorts, parallelism, dirtyOptions, 
dirtySink);
+        return new DorisDynamicTableSink(options,
+                readOptions,
+                executionOptions,
+                tableSchema,
+                multipleSink,
+                sinkMultipleFormat,
+                databasePattern,
+                tablePattern,
+                ignoreSingleTableErrors,
+                schemaUpdatePolicy,
+                inlongMetric,
+                auditHostAndPorts,
+                parallelism,
+                dirtyOptions,
+                dirtySink);
     }
 
     @Override
diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
index deaa17588..a413e17da 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
@@ -120,6 +120,7 @@ public class DorisStreamLoad implements Serializable {
 
         try {
             final String loadUrlStr = String.format(LOAD_URL_PATTERN, 
hostPort, db, tbl);
+            LOG.info("Streamload Url:{}", loadUrlStr);
             HttpPut put = new HttpPut(loadUrlStr);
             put.setHeader(HttpHeaders.EXPECT, "100-continue");
             put.setHeader(HttpHeaders.AUTHORIZATION, this.authEncoding);
@@ -129,7 +130,9 @@ public class DorisStreamLoad implements Serializable {
                     put.setHeader(String.valueOf(entry.getKey()), 
String.valueOf(entry.getValue()));
                 }
             }
-            put.setHeader("format", "json");
+            if (!put.containsHeader("format")) {
+                put.setHeader("format", "json");
+            }
             put.setHeader("strip_outer_array", "true");
             StringEntity entity = new StringEntity(value, "UTF-8");
             put.setEntity(entity);


Reply via email to