This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0039d3ed9 [INLONG-7584][Sort] Doris connector supports writing CSV and
archiving dirty data (#7585)
0039d3ed9 is described below
commit 0039d3ed92fd12d5d04e7d4472af69d18cfa7337
Author: Liao Rui <[email protected]>
AuthorDate: Wed Mar 15 16:13:51 2023 +0800
[INLONG-7584][Sort] Doris connector supports writing CSV and archiving
dirty data (#7585)
Co-authored-by: ryanrliao <[email protected]>
---
.../table/DorisDynamicSchemaOutputFormat.java | 380 ++++++++++++++-------
.../sort/doris/table/DorisDynamicTableFactory.java | 22 +-
.../sort/doris/table/DorisDynamicTableSink.java | 23 +-
.../inlong/sort/doris/table/DorisStreamLoad.java | 5 +-
4 files changed, 304 insertions(+), 126 deletions(-)
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 2ee979d15..c056a92ee 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.doris.table;
+import java.util.LinkedHashSet;
+import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
@@ -25,7 +27,6 @@ import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.StreamLoadException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.Schema;
-import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -35,15 +36,17 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.dirty.DirtyData;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
@@ -51,6 +54,7 @@ import
org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.doris.model.RespContent;
import org.apache.inlong.sort.doris.util.DorisParseUtils;
@@ -94,6 +98,11 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final String COLUMNS_KEY = "columns";
private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
+ private static final String DIRTY_LOG_TAG = "__DIRTY_LOG_TAG__";
+ private static final String DIRTY_LABEL = "__DIRTY_LABEL__";
+ private static final String DIRTY_IDENTIFIER = "__DIRTY_IDENTIFIER__";
+ private static final String DATABASE = "__DATABASE__";
+ private static final String TABLE = "__TABLE__";
/**
* Mark the record for delete
*/
@@ -102,6 +111,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
* Mark the record for not delete
*/
private static final String DORIS_DELETE_FALSE = "0";
+ private static final String FORMAT_JSON_VALUE = "json";
+ private static final String FORMAT_CSV_VALUE = "csv";
+ private static final String FORMAT_KEY = "format";
private static final String FIELD_DELIMITER_KEY = "column_separator";
private static final String FIELD_DELIMITER_DEFAULT = "\t";
private static final String LINE_DELIMITER_KEY = "line_delimiter";
@@ -113,6 +125,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
@SuppressWarnings({"rawtypes"})
private final Map<String, List> batchMap = new HashMap<>();
private final Map<String, String> columnsMap = new HashMap<>();
+ /**
+ * data will not be submitted when table is in errorTables list
+ */
private final List<String> errorTables = new ArrayList<>();
private final DorisOptions options;
private final DorisReadOptions readOptions;
@@ -130,6 +145,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private final String tablePattern;
private final String dynamicSchemaFormat;
private final boolean ignoreSingleTableErrors;
+ private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private long batchBytes = 0L;
private int size;
private DorisStreamLoad dorisStreamLoad;
@@ -146,9 +162,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private volatile RowData.FieldGetter[] fieldGetters;
private String fieldDelimiter;
private String lineDelimiter;
+ private String columns;
private final LogicalType[] logicalTypes;
- private final DirtyOptions dirtyOptions;
- private @Nullable final DirtySink<Object> dirtySink;
+ private DirtySinkHelper<Object> dirtySinkHelper;
private transient Schema schema;
public DorisDynamicSchemaOutputFormat(DorisOptions option,
@@ -161,6 +177,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
String databasePattern,
String tablePattern,
boolean ignoreSingleTableErrors,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy,
String inlongMetric,
String auditHostAndPorts,
boolean multipleSink,
@@ -179,8 +196,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
this.ignoreSingleTableErrors = ignoreSingleTableErrors;
- this.dirtyOptions = dirtyOptions;
- this.dirtySink = dirtySink;
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
+ this.dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
+
+ handleStreamLoadProp();
}
/**
@@ -203,18 +222,23 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
props.remove(ESCAPE_DELIMITERS_KEY);
}
- // add column key when fieldNames is not empty
- if (!props.containsKey(COLUMNS_KEY) && fieldNames != null &&
fieldNames.length > 0) {
- String columns = Arrays.stream(fieldNames)
- .map(item -> String.format("`%s`",
item.trim().replace("`", "")))
- .collect(Collectors.joining(","));
- props.put(COLUMNS_KEY, columns);
- }
+ // save `sink.properties.columns` parameter from options
+ this.columns = (String) props.get(COLUMNS_KEY);
+ if (!multipleSink) {
+ // add column key when fieldNames is not empty
+ if (!props.containsKey(COLUMNS_KEY) && fieldNames != null &&
fieldNames.length > 0) {
+ String columns =
+ Arrays.stream(fieldNames).map(item ->
String.format("`%s`", item.trim().replace("`", "")))
+ .collect(Collectors.joining(","));
+ props.put(COLUMNS_KEY, columns);
+ }
- // if enable batch delete, the columns must add tag
'__DORIS_DELETE_SIGN__'
- String columns = (String) props.get(COLUMNS_KEY);
- if (!columns.contains(DORIS_DELETE_SIGN) && enableBatchDelete()) {
- props.put(COLUMNS_KEY, String.format("%s,%s", columns,
DORIS_DELETE_SIGN));
+ // if enable batch delete, the columns must add tag
'__DORIS_DELETE_SIGN__'
+ String columns = (String) props.get(COLUMNS_KEY);
+ if (columns != null && !columns.contains(DORIS_DELETE_SIGN) &&
enableBatchDelete()) {
+ columns = String.format("%s,%s", columns, DORIS_DELETE_SIGN);
+ props.put(COLUMNS_KEY, columns);
+ }
}
}
@@ -235,7 +259,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
dorisStreamLoad = new DorisStreamLoad(getBackend(),
options.getUsername(), options.getPassword(), loadProps);
if (!multipleSink) {
this.jsonFormat = true;
- handleStreamLoadProp();
+ // handleStreamLoadProp();
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
for (int i = 0; i < logicalTypes.length; i++) {
fieldGetters[i] =
DorisParseUtils.createFieldGetter(logicalTypes[i], i);
@@ -266,13 +290,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
metricData.registerSubMetricsGroup(metricState);
}
}
- if (dirtySink != null) {
- try {
- dirtySink.open(new Configuration());
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
+ dirtySinkHelper.open(new Configuration());
if (executionOptions.getBatchIntervalMs() != 0 &&
executionOptions.getBatchSize() != 1) {
this.scheduler = new ScheduledThreadPoolExecutor(1,
new
ExecutorThreadFactory("doris-streamload-output-format"));
@@ -286,10 +304,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private boolean checkFlushException(String tableIdentifier) {
Exception ex = flushExceptionMap.get(tableIdentifier);
- if (!multipleSink || ex == null) {
+ if (!multipleSink || ex == null ||
SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == schemaUpdatePolicy) {
return false;
}
- if (!ignoreSingleTableErrors) {
+ if (SchemaUpdateExceptionPolicy.THROW_WITH_STOP == schemaUpdatePolicy)
{
throw new RuntimeException("Writing records to streamload failed,
tableIdentifier=" + tableIdentifier, ex);
}
return true;
@@ -340,7 +358,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
batchMap.putIfAbsent(tableIdentifier, mapData);
} catch (Exception e) {
LOG.error(String.format("serialize error, raw data: %s", row),
e);
- handleDirtyData(row, DirtyType.SERIALIZE_ERROR, e);
+ if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE ==
schemaUpdatePolicy) {
+ handleDirtyData(row, DirtyType.SERIALIZE_ERROR, e);
+ }
}
} else if (row instanceof String) {
batchBytes += ((String)
row).getBytes(StandardCharsets.UTF_8).length;
@@ -349,8 +369,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
batchMap.putIfAbsent(tableIdentifier, mapData);
} else {
LOG.error(String.format("The type of element should be 'RowData'
or 'String' only., raw data: %s", row));
- handleDirtyData(row, DirtyType.UNSUPPORTED_DATA_TYPE,
- new RuntimeException("The type of element should be
'RowData' or 'String' only."));
+ if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE ==
schemaUpdatePolicy) {
+ handleDirtyData(row, DirtyType.UNSUPPORTED_DATA_TYPE,
+ new RuntimeException("The type of element should be
'RowData' or 'String' only."));
+ }
}
}
@@ -368,7 +390,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
rootNode =
jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
} catch (Exception e) {
LOG.error(String.format("deserialize error, raw data: %s", new
String(rowData.getBinary(0))), e);
- handleDirtyData(new String(rowData.getBinary(0)),
DirtyType.DESERIALIZE_ERROR, e);
+ if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE ==
schemaUpdatePolicy) {
+ handleDirtyData(new String(rowData.getBinary(0)),
DirtyType.DESERIALIZE_ERROR, e);
+ }
return;
}
boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
@@ -401,7 +425,9 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
} catch (Exception e) {
LOG.error(String.format("json parse error, raw data: %s", new
String(rowData.getBinary(0))), e);
- handleDirtyData(new String(rowData.getBinary(0)),
DirtyType.JSON_PROCESS_ERROR, e);
+ if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE ==
schemaUpdatePolicy) {
+ handleDirtyData(new String(rowData.getBinary(0)),
DirtyType.JSON_PROCESS_ERROR, e);
+ }
return;
}
for (int i = 0; i < physicalDataList.size(); i++) {
@@ -417,17 +443,26 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
} else {
LOG.error(String.format("The type of element should be 'RowData'
only, raw data: %s", row));
- handleDirtyData(row, DirtyType.UNSUPPORTED_DATA_TYPE,
- new RuntimeException("The type of element should be
'RowData' only."));
+ if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE ==
schemaUpdatePolicy) {
+ handleDirtyData(row, DirtyType.UNSUPPORTED_DATA_TYPE,
+ new RuntimeException("The type of element should be
'RowData' only."));
+ }
}
}
@SuppressWarnings({"unchecked"})
private void addRow(RowKind rowKind, JsonNode rootNode, JsonNode
physicalNode, JsonNode updateBeforeNode,
Map<String, String> physicalData, Map<String, String>
updateBeforeData) throws IOException {
- String tableIdentifier = StringUtils.join(
- jsonDynamicSchemaFormat.parse(rootNode, databasePattern), ".",
- jsonDynamicSchemaFormat.parse(rootNode, tablePattern));
+ String database = jsonDynamicSchemaFormat.parse(rootNode,
databasePattern);
+ String table = jsonDynamicSchemaFormat.parse(rootNode, tablePattern);
+ String tableIdentifier = StringUtils.join(database, ".", table);
+ if (dirtySinkHelper.getDirtySink() != null) {
+ try {
+ fillDirtySink(rootNode, physicalData, updateBeforeData,
database, table);
+ } catch (Exception e) {
+ LOG.warn("fill dirty sink parameters failed");
+ }
+ }
switch (rowKind) {
case INSERT:
case UPDATE_AFTER:
@@ -469,78 +504,82 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
}
+ private void fillDirtySink(JsonNode rootNode, Map<String, String>
physicalData,
+ Map<String, String> updateBeforeData, String database, String
table)
+ throws IOException {
+ DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
+ String dirtyLabel = null;
+ String dirtyLogTag = null;
+ String dirtyIdentifier = null;
+ if (dirtyOptions.ignoreDirty()) {
+ dirtyLabel = jsonDynamicSchemaFormat.parse(rootNode,
+ DirtySinkHelper.regexReplace(dirtyOptions.getLabels(),
DirtyType.BATCH_LOAD_ERROR, null));
+ dirtyLogTag = jsonDynamicSchemaFormat.parse(rootNode,
+ DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(),
DirtyType.BATCH_LOAD_ERROR, null));
+ dirtyIdentifier = jsonDynamicSchemaFormat.parse(rootNode,
+ DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(),
DirtyType.BATCH_LOAD_ERROR, null));
+ }
+ physicalData.put(DIRTY_LOG_TAG, dirtyLogTag);
+ physicalData.put(DIRTY_IDENTIFIER, dirtyIdentifier);
+ physicalData.put(DIRTY_LABEL, dirtyLabel);
+ physicalData.put(DATABASE, database);
+ physicalData.put(TABLE, table);
+ if (updateBeforeData != null) {
+ updateBeforeData.put(DIRTY_LOG_TAG, dirtyLogTag);
+ updateBeforeData.put(DIRTY_IDENTIFIER, dirtyIdentifier);
+ updateBeforeData.put(DIRTY_LABEL, dirtyLabel);
+ updateBeforeData.put(DATABASE, database);
+ updateBeforeData.put(TABLE, table);
+ }
+ }
+
private void handleDirtyData(Object dirtyData, DirtyType dirtyType,
Exception e) {
errorNum.incrementAndGet();
- if (!dirtyOptions.ignoreDirty()) {
- RuntimeException ex;
- if (e instanceof RuntimeException) {
- ex = (RuntimeException) e;
- } else {
- ex = new RuntimeException(e);
- }
- throw ex;
- }
if (multipleSink) {
if (dirtyType == DirtyType.DESERIALIZE_ERROR) {
LOG.error("database and table can't be identified, will use
default ${database}${table}");
} else {
- handleMultipleDirtyData(dirtyData, dirtyType, e);
+ try {
+ handleMultipleDirtyData(dirtyData, dirtyType, e);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
return;
}
}
- if (dirtySink != null) {
- DirtyData.Builder<Object> builder = DirtyData.builder();
- try {
- builder.setData(dirtyData)
- .setDirtyType(dirtyType)
- .setLabels(dirtyOptions.getLabels())
- .setLogTag(dirtyOptions.getLogTag())
- .setDirtyMessage(e.getMessage())
- .setIdentifier(dirtyOptions.getIdentifier());
- dirtySink.invoke(builder.build());
- } catch (Exception ex) {
- if (!dirtyOptions.ignoreSideOutputErrors()) {
- throw new RuntimeException(ex);
- }
- LOG.warn("Dirty sink failed", ex);
- }
+ DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
+ if (dirtyOptions.ignoreDirty()) {
+ dirtySinkHelper.invoke(dirtyData, dirtyType,
dirtyOptions.getLabels(), dirtyOptions.getLogTag(),
+ dirtyOptions.getIdentifier(), e);
}
+
metricData.invokeDirty(1,
dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
}
- private void handleMultipleDirtyData(Object dirtyData, DirtyType
dirtyType, Exception e) {
- JsonNode rootNode;
- try {
- rootNode = jsonDynamicSchemaFormat.deserialize(((RowData)
dirtyData).getBinary(0));
- } catch (Exception ex) {
- handleDirtyData(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
- return;
+ private void handleMultipleDirtyData(Object dirtyData, DirtyType
dirtyType, Exception e)
+ throws JsonProcessingException {
+ Map<String, String> rawData;
+ if (dirtyData instanceof Map) {
+ rawData = (Map) dirtyData;
+ } else {
+ rawData =
OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsString(dirtyData), Map.class);
}
+ String label = rawData.remove(DIRTY_LABEL);
+ String logTag = rawData.remove(DIRTY_LOG_TAG);
+ String identifier = rawData.remove(DIRTY_IDENTIFIER);
+ String database = rawData.remove(DATABASE);
+ String table = rawData.remove(TABLE);
+ String content = OBJECT_MAPPER.writeValueAsString(rawData);
- if (dirtySink != null) {
- DirtyData.Builder<Object> builder = DirtyData.builder();
- try {
- builder.setData(dirtyData)
- .setDirtyType(dirtyType)
- .setLabels(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getLabels()))
- .setLogTag(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getLogTag()))
- .setDirtyMessage(e.getMessage())
- .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getIdentifier()));
- dirtySink.invoke(builder.build());
- } catch (Exception ex) {
- if (!dirtyOptions.ignoreSideOutputErrors()) {
- throw new RuntimeException(ex);
- }
- LOG.warn("Dirty sink failed", ex);
- }
+ if (dirtySinkHelper.getDirtyOptions().ignoreDirty()) {
+ dirtySinkHelper.invoke(OBJECT_MAPPER.readTree(content), dirtyType,
label, logTag, identifier, e);
}
+
try {
- metricData.outputDirtyMetricsWithEstimate(
- jsonDynamicSchemaFormat.parse(rootNode, databasePattern),
- jsonDynamicSchemaFormat.parse(rootNode, tablePattern), 1,
- ((RowData) dirtyData).getBinary(0).length);
+ metricData.outputDirtyMetricsWithEstimate(database, table, 1,
+ content.getBytes(StandardCharsets.UTF_8).length);
} catch (Exception ex) {
metricData.invokeDirty(1,
dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
}
@@ -645,15 +684,17 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
return;
}
String loadValue = null;
- RespContent respContent = null;
+ RespContent respContent;
try {
- loadValue = OBJECT_MAPPER.writeValueAsString(values);
+ // support csv and json format
+ String format =
executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY, FORMAT_JSON_VALUE);
+ loadValue = serialize(values, format);
respContent = load(tableIdentifier, loadValue);
try {
if (null != metricData && null != respContent) {
if (multipleSink) {
String[] tableWithDb = tableIdentifier.split("\\.");
- metricData.outputMetrics(tableWithDb[0], null,
tableWithDb[1],
+ metricData.outputMetrics(tableWithDb[0],
tableWithDb[1],
respContent.getNumberLoadedRows(),
respContent.getLoadBytes());
} else {
metricData.invoke(respContent.getNumberLoadedRows(),
respContent.getLoadBytes());
@@ -667,34 +708,122 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
values.clear();
} catch (Exception e) {
LOG.error(String.format("Flush table: %s error", tableIdentifier),
e);
- // Makesure it is a dirty data
- if (respContent == null ||
StringUtils.isNotBlank(respContent.getErrorURL())) {
- flushExceptionMap.put(tableIdentifier, e);
- errorNum.getAndAdd(values.size());
+ flushExceptionMap.put(tableIdentifier, e);
+ // may count repeatedly
+ errorNum.getAndAdd(values.size());
+
+ if (!multipleSink) {
+ try {
+ handleSingleTable(e, values, loadValue);
+ return;
+ } catch (Exception ex) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (SchemaUpdateExceptionPolicy.THROW_WITH_STOP ==
schemaUpdatePolicy) {
+ throw new RuntimeException(
+ String.format("Writing records to streamload of
tableIdentifier:%s failed, the value: %s.",
+ tableIdentifier, loadValue),
+ e);
+ }
+ if (SchemaUpdateExceptionPolicy.STOP_PARTIAL ==
schemaUpdatePolicy) {
+ errorTables.add(tableIdentifier);
+ LOG.warn("The tableIdentifier: {} load failed and the data
will be throw away in the future "
+ + "because the option
'sink.multiple.schema-update.policy' is 'STOP_PARTIAL'",
+ tableIdentifier);
+ return;
+ }
+ if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE ==
schemaUpdatePolicy) {
+ errorTables.add(tableIdentifier);
+ // archive dirty data when
'sink.multiple.schema-update.policy' is 'LOG_WITH_IGNORE'
for (Object value : values) {
try {
handleDirtyData(OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsString(value)),
DirtyType.BATCH_LOAD_ERROR, e);
- } catch (IOException ex) {
- if (!dirtyOptions.ignoreSideOutputErrors()) {
+ } catch (Exception ex) {
+ if
(!dirtySinkHelper.getDirtyOptions().ignoreSideOutputErrors()) {
throw new RuntimeException(ex);
}
LOG.warn("Dirty sink failed", ex);
}
}
- if (!ignoreSingleTableErrors) {
- throw new RuntimeException(
- String.format("Writing records to streamload of
tableIdentifier:%s failed, the value: %s.",
- tableIdentifier, loadValue),
- e);
+ }
+
+ values.clear();
+ }
+ }
+
+ private void handleSingleTable(Exception e, List values, String loadValue)
{
+ for (Object value : values) {
+ try {
+
handleDirtyData(OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsString(value)),
+ DirtyType.BATCH_LOAD_ERROR, e);
+ } catch (IOException ex) {
+ if
(!dirtySinkHelper.getDirtyOptions().ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
}
- errorTables.add(tableIdentifier);
- LOG.warn("The tableIdentifier: {} load failed and the data
will be throw away in the future"
- + " because the option
'sink.multiple.ignore-single-table-errors' is 'true'", tableIdentifier);
- } else {
- throw new RuntimeException(e);
+ LOG.warn("Dirty sink failed", ex);
}
}
+ if (!ignoreSingleTableErrors) {
+ throw new RuntimeException(
+ String.format("Writing records to streamload of
tableIdentifier:%s failed, the value: %s.",
+ tableIdentifier, loadValue),
+ e);
+ }
+ errorTables.add(tableIdentifier);
+ LOG.warn("The tableIdentifier: {} load failed and the data will be
throw away in the future"
+ + " because the option
'sink.multiple.ignore-single-table-errors' is 'true'", tableIdentifier);
+ }
+
+ /**
+ * format data to csv or json
+ *
+ * @param values
+ * @param format
+ * @return string
+ * @throws JsonProcessingException
+ */
+ private String serialize(List values, String format) throws
JsonProcessingException {
+ if (FORMAT_CSV_VALUE.equalsIgnoreCase(format)) {
+ LOG.info("doris data format: {}", format);
+ // set columns, and format json data to csv
+ String columns = null;
+ StringBuilder csvData = new StringBuilder();
+ for (Object item : values) {
+ if (item instanceof Map) {
+ Map<String, String> map = (Map<String, String>) item;
+ Set<String> fieldNameSet = new
LinkedHashSet<>(map.keySet());
+ if (columns == null) {
+ // when single table synchronizing, parameter
`sink.properties.columns` in options may
+ // contain hll or bitmap function.
+ // columns: dt,id,name,province,os, pv=hll_hash(id)
+ if (this.columns != null) {
+ for (String fieldName : this.columns.split(",")) {
+ if (fieldName.contains("=")) {
+ fieldNameSet.add(fieldName);
+ }
+ }
+ }
+ columns = StringUtils.join(fieldNameSet, ",");
+ executionOptions.getStreamLoadProp().put(COLUMNS_KEY,
columns);
+ }
+ int idx = 0;
+ int len = map.values().size();
+ for (String val : map.values()) {
+ csvData.append(null == val ? "\\N" : val);
+ if (idx++ < len - 1) {
+ csvData.append(this.fieldDelimiter);
+ }
+ }
+ csvData.append(this.lineDelimiter);
+ }
+ }
+ return csvData.toString();
+ } else {
+ return OBJECT_MAPPER.writeValueAsString(values);
+ }
}
@SuppressWarnings("rawtypes")
@@ -715,10 +844,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private RespContent load(String tableIdentifier, String result) throws
IOException {
String[] tableWithDb = tableIdentifier.split("\\.");
RespContent respContent = null;
- // Dynamic set COLUMNS_KEY for tableIdentifier every time for multiple
sink scenario
- if (multipleSink) {
- executionOptions.getStreamLoadProp().put(COLUMNS_KEY,
columnsMap.get(tableIdentifier));
- }
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
respContent = dorisStreamLoad.load(tableWithDb[0],
tableWithDb[1], result);
@@ -784,6 +909,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private String databasePattern;
private String tablePattern;
private boolean ignoreSingleTableErrors;
+ private SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private boolean multipleSink;
private String inlongMetric;
private String auditHostAndPorts;
@@ -837,8 +963,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
return this;
}
- public DorisDynamicSchemaOutputFormat.Builder setDynamicSchemaFormat(
- String dynamicSchemaFormat) {
+ public DorisDynamicSchemaOutputFormat.Builder
setDynamicSchemaFormat(String dynamicSchemaFormat) {
this.dynamicSchemaFormat = dynamicSchemaFormat;
return this;
}
@@ -883,6 +1008,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
return this;
}
+ public DorisDynamicSchemaOutputFormat.Builder setSchemaUpdatePolicy(
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy) {
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
+ return this;
+ }
+
@SuppressWarnings({"rawtypes"})
public DorisDynamicSchemaOutputFormat build() {
LogicalType[] logicalTypes = null;
@@ -891,9 +1022,22 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
.map(DataType::getLogicalType).toArray(LogicalType[]::new);
}
return new DorisDynamicSchemaOutputFormat(
-
optionsBuilder.setTableIdentifier(tableIdentifier).build(), readOptions,
executionOptions,
- tableIdentifier, logicalTypes, fieldNames,
dynamicSchemaFormat, databasePattern, tablePattern,
- ignoreSingleTableErrors, inlongMetric, auditHostAndPorts,
multipleSink, dirtyOptions, dirtySink);
+ optionsBuilder.setTableIdentifier(tableIdentifier).build(),
+ readOptions,
+ executionOptions,
+ tableIdentifier,
+ logicalTypes,
+ fieldNames,
+ dynamicSchemaFormat,
+ databasePattern,
+ tablePattern,
+ ignoreSingleTableErrors,
+ schemaUpdatePolicy,
+ inlongMetric,
+ auditHostAndPorts,
+ multipleSink,
+ dirtyOptions,
+ dirtySink);
}
}
}
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index e5e482c29..7e0addaeb 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -45,6 +45,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT;
@@ -63,6 +64,7 @@ import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTE
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
/**
@@ -147,7 +149,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
ConfigOptions
.key("sink.batch.size")
.intType()
- .defaultValue(100)
+ .defaultValue(1024)
.withDescription("the flush max size (includes all append, upsert
and delete records), over this number"
+ " of records, will flush data. The default value is
100.");
private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
@@ -215,6 +217,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
options.add(SINK_MULTIPLE_TABLE_PATTERN);
options.add(SINK_MULTIPLE_ENABLE);
options.add(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
+ options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
options.add(INLONG_METRIC);
options.add(INLONG_AUDIT);
options.add(FactoryUtil.SINK_PARALLELISM);
@@ -302,6 +305,8 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
String tablePattern =
helper.getOptions().getOptional(SINK_MULTIPLE_TABLE_PATTERN).orElse(null);
boolean multipleSink = helper.getOptions().get(SINK_MULTIPLE_ENABLE);
boolean ignoreSingleTableErrors =
helper.getOptions().get(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy = helper.getOptions()
+
.getOptional(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY).orElse(SchemaUpdateExceptionPolicy.THROW_WITH_STOP);
String sinkMultipleFormat =
helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
multipleSink, sinkMultipleFormat, databasePattern,
tablePattern);
@@ -316,9 +321,18 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
- physicalSchema, multipleSink, sinkMultipleFormat,
databasePattern,
- tablePattern, ignoreSingleTableErrors, inlongMetric,
auditHostAndPorts, parallelism,
- dirtyOptions, dirtySink);
+ physicalSchema,
+ multipleSink,
+ sinkMultipleFormat,
+ databasePattern,
+ tablePattern,
+ ignoreSingleTableErrors,
+ schemaUpdatePolicy,
+ inlongMetric,
+ auditHostAndPorts,
+ parallelism,
+ dirtyOptions,
+ dirtySink);
}
private void validateSinkMultiple(DataType physicalDataType, boolean
multipleSink, String sinkMultipleFormat,
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
index 8e6527c12..e04bd6704 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.doris.internal.GenericDorisSinkFunction;
import javax.annotation.Nullable;
@@ -45,6 +46,7 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
private final String databasePattern;
private final String tablePattern;
private final boolean ignoreSingleTableErrors;
+ private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
private final String inlongMetric;
private final String auditHostAndPorts;
private final Integer parallelism;
@@ -60,6 +62,7 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
String databasePattern,
String tablePattern,
boolean ignoreSingleTableErrors,
+ SchemaUpdateExceptionPolicy schemaUpdatePolicy,
String inlongMetric,
String auditHostAndPorts,
Integer parallelism,
@@ -74,6 +77,7 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
this.ignoreSingleTableErrors = ignoreSingleTableErrors;
+ this.schemaUpdatePolicy = schemaUpdatePolicy;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
this.parallelism = parallelism;
@@ -107,6 +111,7 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
.setTablePattern(tablePattern)
.setDynamicSchemaFormat(sinkMultipleFormat)
.setIgnoreSingleTableErrors(ignoreSingleTableErrors)
+ .setSchemaUpdatePolicy(schemaUpdatePolicy)
.setDirtyOptions(dirtyOptions)
.setDirtySink(dirtySink);
return SinkFunctionProvider.of(
@@ -115,9 +120,21 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
@Override
public DynamicTableSink copy() {
- return new DorisDynamicTableSink(options, readOptions,
executionOptions, tableSchema,
- multipleSink, sinkMultipleFormat, databasePattern,
tablePattern, ignoreSingleTableErrors,
- inlongMetric, auditHostAndPorts, parallelism, dirtyOptions,
dirtySink);
+ return new DorisDynamicTableSink(options,
+ readOptions,
+ executionOptions,
+ tableSchema,
+ multipleSink,
+ sinkMultipleFormat,
+ databasePattern,
+ tablePattern,
+ ignoreSingleTableErrors,
+ schemaUpdatePolicy,
+ inlongMetric,
+ auditHostAndPorts,
+ parallelism,
+ dirtyOptions,
+ dirtySink);
}
@Override
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
index deaa17588..a413e17da 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
@@ -120,6 +120,7 @@ public class DorisStreamLoad implements Serializable {
try {
final String loadUrlStr = String.format(LOAD_URL_PATTERN,
hostPort, db, tbl);
+ LOG.info("Streamload Url:{}", loadUrlStr);
HttpPut put = new HttpPut(loadUrlStr);
put.setHeader(HttpHeaders.EXPECT, "100-continue");
put.setHeader(HttpHeaders.AUTHORIZATION, this.authEncoding);
@@ -129,7 +130,9 @@ public class DorisStreamLoad implements Serializable {
put.setHeader(String.valueOf(entry.getKey()),
String.valueOf(entry.getValue()));
}
}
- put.setHeader("format", "json");
+ if (!put.containsHeader("format")) {
+ put.setHeader("format", "json");
+ }
put.setHeader("strip_outer_array", "true");
StringEntity entity = new StringEntity(value, "UTF-8");
put.setEntity(entity);