This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 501d5e63c [INLONG-6307][Sort] Add whether to ignore single-table error
policy processing for multiple sink (#6308)
501d5e63c is described below
commit 501d5e63c15e649da442248d6ef026bf65ee22f7
Author: yunqingmoswu <[email protected]>
AuthorDate: Thu Oct 27 22:25:35 2022 +0800
[INLONG-6307][Sort] Add whether to ignore single-table error policy
processing for multiple sink (#6308)
---
.../org/apache/inlong/sort/base/Constants.java | 6 ++
.../table/DorisDynamicSchemaOutputFormat.java | 66 +++++++++++++++-------
.../sort/doris/table/DorisDynamicTableFactory.java | 7 ++-
.../sort/doris/table/DorisDynamicTableSink.java | 12 ++--
4 files changed, 64 insertions(+), 27 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index fb542ef6d..19c58e9c1 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -157,4 +157,10 @@ public final class Constants {
.enumType(SchemaUpdateExceptionPolicy.class)
.defaultValue(SchemaUpdateExceptionPolicy.TRY_IT_BEST)
.withDescription("The action to deal with schema update in
multiple sink.");
+
+ public static final ConfigOption<Boolean>
SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS =
+ ConfigOptions.key("sink.multiple.ignore-single-table-errors")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether ignore the single table erros
when multiple sink writing scenario.");
}
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 8782ab63e..c2db85789 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -75,13 +75,14 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private final String databasePattern;
private final String tablePattern;
private final String dynamicSchemaFormat;
+ private final boolean ignoreSingleTableErrors;
+ private final transient Map<String, Exception> flushExceptionMap = new
HashMap<>();
private long batchBytes = 0L;
private int size;
private DorisStreamLoad dorisStreamLoad;
private transient volatile boolean closed = false;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
- private transient volatile Exception flushException;
private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
public DorisDynamicSchemaOutputFormat(DorisOptions option,
@@ -89,13 +90,15 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
DorisExecutionOptions executionOptions,
String dynamicSchemaFormat,
String databasePattern,
- String tablePattern) {
+ String tablePattern,
+ boolean ignoreSingleTableErrors) {
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
this.dynamicSchemaFormat = dynamicSchemaFormat;
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
+ this.ignoreSingleTableErrors = ignoreSingleTableErrors;
}
/**
@@ -130,26 +133,28 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() ->
{
synchronized (DorisDynamicSchemaOutputFormat.this) {
if (!closed) {
- try {
- flush();
- } catch (Exception e) {
- flushException = e;
- }
+ flush();
}
}
}, executionOptions.getBatchIntervalMs(),
executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
}
}
- private void checkFlushException() {
- if (flushException != null) {
- throw new RuntimeException("Writing records to streamload
failed.", flushException);
+ private boolean checkFlushException(String tableIdentifier) {
+ Exception flushException = flushExceptionMap.get(tableIdentifier);
+ if (flushException == null) {
+ return false;
}
+ if (!ignoreSingleTableErrors) {
+ throw new RuntimeException(
+ String.format("Writing records to streamload of
tableIdentifier:%s failed.", tableIdentifier),
+ flushException);
+ }
+ return true;
}
@Override
public synchronized void writeRecord(T row) throws IOException {
- checkFlushException();
addBatch(row);
boolean valid = (executionOptions.getBatchSize() > 0 && size >=
executionOptions.getBatchSize())
|| batchBytes >= executionOptions.getMaxBatchBytes();
@@ -169,9 +174,11 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
return;
}
String tableIdentifier = StringUtils.join(
- jsonDynamicSchemaFormat.parse(rowData.getBinary(0),
databasePattern),
- ".",
- jsonDynamicSchemaFormat.parse(rowData.getBinary(0),
tablePattern));
+ jsonDynamicSchemaFormat.parse(rootNode, databasePattern),
".",
+ jsonDynamicSchemaFormat.parse(rootNode, tablePattern));
+ if (checkFlushException(tableIdentifier)) {
+ return;
+ }
List<RowKind> rowKinds = jsonDynamicSchemaFormat
.opType2RowKind(jsonDynamicSchemaFormat.getOpType(rootNode));
List<Map<String, String>> physicalDataList =
jsonDynamicSchemaFormat.jsonNode2Map(
@@ -247,18 +254,31 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
this.dorisStreamLoad.close();
}
}
- checkFlushException();
}
@SuppressWarnings({"rawtypes"})
- public synchronized void flush() throws IOException {
- checkFlushException();
+ public synchronized void flush() {
if (batchMap.isEmpty()) {
return;
}
for (Entry<String, List> kvs : batchMap.entrySet()) {
- load(kvs.getKey(),
OBJECT_MAPPER.writeValueAsString(kvs.getValue()));
+ if (checkFlushException(kvs.getKey())) {
+ continue;
+ }
+ try {
+ load(kvs.getKey(),
OBJECT_MAPPER.writeValueAsString(kvs.getValue()));
+ } catch (Exception e) {
+ flushExceptionMap.put(kvs.getKey(), e);
+ if (!ignoreSingleTableErrors) {
+ throw new RuntimeException(
+ String.format("Writing records to streamload of
tableIdentifier:%s failed.", kvs.getKey()),
+ e);
+ }
+ batchMap.remove(kvs.getKey());
+ }
}
+ batchBytes = 0;
+ size = 0;
}
private void load(String tableIdentifier, String result) throws
IOException {
@@ -284,8 +304,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
}
}
- batchBytes = 0;
- size = 0;
}
private String getBackend() throws IOException {
@@ -309,6 +327,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private String dynamicSchemaFormat;
private String databasePattern;
private String tablePattern;
+ private boolean ignoreSingleTableErrors;
public Builder() {
this.optionsBuilder =
DorisOptions.builder().setTableIdentifier("");
@@ -355,11 +374,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
return this;
}
+ public DorisDynamicSchemaOutputFormat.Builder
setIgnoreSingleTableErrors(boolean ignoreSingleTableErrors) {
+ this.ignoreSingleTableErrors = ignoreSingleTableErrors;
+ return this;
+ }
+
@SuppressWarnings({"rawtypes"})
public DorisDynamicSchemaOutputFormat build() {
return new DorisDynamicSchemaOutputFormat(
optionsBuilder.build(), readOptions, executionOptions,
- dynamicSchemaFormat, databasePattern, tablePattern);
+ dynamicSchemaFormat, databasePattern, tablePattern,
ignoreSingleTableErrors);
}
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index 271cc15b8..98c08c753 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -57,6 +57,7 @@ import static
org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
+import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
/**
@@ -208,6 +209,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
options.add(SINK_MULTIPLE_DATABASE_PATTERN);
options.add(SINK_MULTIPLE_TABLE_PATTERN);
options.add(SINK_MULTIPLE_ENABLE);
+ options.add(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
return options;
}
@@ -290,6 +292,7 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
String databasePattern =
helper.getOptions().getOptional(SINK_MULTIPLE_DATABASE_PATTERN).orElse(null);
String tablePattern =
helper.getOptions().getOptional(SINK_MULTIPLE_TABLE_PATTERN).orElse(null);
boolean multipleSink = helper.getOptions().get(SINK_MULTIPLE_ENABLE);
+ boolean ignoreSingleTableErrors =
helper.getOptions().get(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
String sinkMultipleFormat =
helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
multipleSink, sinkMultipleFormat, databasePattern,
tablePattern);
@@ -298,8 +301,8 @@ public final class DorisDynamicTableFactory implements
DynamicTableSourceFactory
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
- physicalSchema, multipleSink, sinkMultipleFormat,
databasePattern, tablePattern
- );
+ physicalSchema, multipleSink, sinkMultipleFormat,
databasePattern,
+ tablePattern, ignoreSingleTableErrors);
}
private void validateSinkMultiple(DataType physicalDataType, boolean
multipleSink, String sinkMultipleFormat,
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
index b6499298c..bc847a398 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
@@ -41,6 +41,7 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
private final String sinkMultipleFormat;
private final String databasePattern;
private final String tablePattern;
+ private final boolean ignoreSingleTableErrors;
public DorisDynamicTableSink(DorisOptions options,
DorisReadOptions readOptions,
@@ -49,7 +50,8 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
boolean multipleSink,
String sinkMultipleFormat,
String databasePattern,
- String tablePattern) {
+ String tablePattern,
+ boolean ignoreSingleTableErrors) {
this.options = options;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
@@ -58,6 +60,7 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
this.sinkMultipleFormat = sinkMultipleFormat;
this.databasePattern = databasePattern;
this.tablePattern = tablePattern;
+ this.ignoreSingleTableErrors = ignoreSingleTableErrors;
}
@Override
@@ -92,14 +95,15 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
.setExecutionOptions(executionOptions)
.setDatabasePattern(databasePattern)
.setTablePattern(tablePattern)
- .setDynamicSchemaFormat(sinkMultipleFormat);
+ .setDynamicSchemaFormat(sinkMultipleFormat)
+ .setIgnoreSingleTableErrors(ignoreSingleTableErrors);
return OutputFormatProvider.of(builder.build());
}
@Override
public DynamicTableSink copy() {
- return new DorisDynamicTableSink(options, readOptions,
executionOptions,
- tableSchema, multipleSink, sinkMultipleFormat,
databasePattern, tablePattern);
+ return new DorisDynamicTableSink(options, readOptions,
executionOptions, tableSchema,
+ multipleSink, sinkMultipleFormat, databasePattern,
tablePattern, ignoreSingleTableErrors);
}
@Override