This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 179143610 [INLONG-6886][Sort] Add dirty message for doris sink (#6887)
179143610 is described below
commit 179143610e6fd49f3813151d680b9c513e9e82ae
Author: yunqingmoswu <[email protected]>
AuthorDate: Thu Dec 15 15:49:46 2022 +0800
[INLONG-6886][Sort] Add dirty message for doris sink (#6887)
---
.../sort/doris/table/DorisDynamicSchemaOutputFormat.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index fd8d8eacd..e5aeec53a 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -268,6 +268,13 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
if (metricOption != null) {
metricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
}
+ if (dirtySink != null) {
+ try {
+ dirtySink.open(new Configuration());
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
if (executionOptions.getBatchIntervalMs() != 0 &&
executionOptions.getBatchSize() != 1) {
this.scheduler = new ScheduledThreadPoolExecutor(1,
new
ExecutorThreadFactory("doris-streamload-output-format"));
@@ -358,12 +365,13 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
if (row instanceof RowData) {
RowData rowData = (RowData) row;
- JsonNode rootNode = null;
+ JsonNode rootNode;
try {
rootNode =
jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
} catch (Exception e) {
LOG.error(String.format("deserialize error, raw data: %s", new
String(rowData.getBinary(0))), e);
handleDirtyData(new String(rowData.getBinary(0)),
DirtyType.DESERIALIZE_ERROR, e);
+ return;
}
boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
if (isDDL) {
@@ -478,6 +486,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
.setDirtyType(dirtyType)
.setLabels(dirtyOptions.getLabels())
.setLogTag(dirtyOptions.getLogTag())
+ .setDirtyMessage(e.getMessage())
.setIdentifier(dirtyOptions.getIdentifier());
dirtySink.invoke(builder.build());
} catch (Exception ex) {