This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 08db5ea09 [INLONG-6765][Sort] Supports dirty data side-output for
Iceberg sink (#6766)
08db5ea09 is described below
commit 08db5ea092f825ab44dace4534a9c67cc748800c
Author: yunqingmoswu <[email protected]>
AuthorDate: Thu Dec 8 09:54:36 2022 +0800
[INLONG-6765][Sort] Supports dirty data side-output for Iceberg sink (#6766)
---
.../org/apache/inlong/sort/base/Constants.java | 11 ++-
.../apache/inlong/sort/base/dirty/DirtyData.java | 41 +++++++-
.../apache/inlong/sort/base/dirty/DirtyType.java | 12 +++
.../sort/base/dirty/sink/log/LogDirtySink.java | 27 +++---
.../sort/base/dirty/sink/s3/S3DirtySink.java | 33 ++++---
.../inlong/sort/base/dirty/utils/FormatUtils.java | 30 ++++++
.../inlong/sort/base/sink/MultipleSinkOption.java | 1 -
.../inlong/sort/base/dirty/FormatUtilsTest.java | 4 +-
.../sort/iceberg/FlinkDynamicTableFactory.java | 13 ++-
.../inlong/sort/iceberg/IcebergTableSink.java | 16 +++-
.../apache/inlong/sort/iceberg/sink/FlinkSink.java | 35 +++++--
.../sink/multiple/DynamicSchemaHandleOperator.java | 106 ++++++++++++++++-----
.../sink/multiple/IcebergMultipleStreamWriter.java | 19 +++-
.../sink/multiple/IcebergSingleStreamWriter.java | 60 ++++++++++--
.../sort/parser/IcebergNodeSqlParserTest.java | 11 ++-
15 files changed, 328 insertions(+), 91 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 983b3c207..23676ec0f 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -227,11 +227,11 @@ public final class Constants {
.stringType()
.noDefaultValue()
.withDescription(
- "The identifier of dirty data, "
- + "it will be used for filename generation
of file dirty sink, "
+ "The identifier of dirty data, it will be used for
filename generation of file dirty sink, "
+ "topic generation of mq dirty sink,
tablename generation of database, etc."
+ "and it supports variable replace like
'${variable}'."
- + "There are two system
variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+ + "There are several system
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] "
+ + "are currently supported, "
+ "and the support of other variables is
determined by the connector.");
public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_ENABLE =
ConfigOptions.key("dirty.side-output.enable")
@@ -266,7 +266,8 @@ public final class Constants {
.withDescription(
"The labels of dirty side-output, format is
'key1=value1&key2=value2', "
+ "it supports variable replace like
'${variable}',"
- + "There are two system
variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+ + "There are two system
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] "
+ + "are currently supported,"
+ " and the support of other variables is
determined by the connector.");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LOG_TAG =
ConfigOptions.key("dirty.side-output.log-tag")
@@ -274,7 +275,7 @@ public final class Constants {
.defaultValue("DirtyData")
.withDescription(
"The log tag of dirty side-output, it supports
variable replace like '${variable}'."
- + "There are two system
variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+ + "There are two system
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,"
+ " and the support of other variables is
determined by the connector.");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FIELD_DELIMITER
=
ConfigOptions.key("dirty.side-output.field-delimiter")
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
index a8b84f2b4..16f1cb762 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
@@ -17,8 +17,10 @@
package org.apache.inlong.sort.base.dirty;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.inlong.sort.base.util.PatternReplaceUtils;
+import javax.annotation.Nullable;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
@@ -33,6 +35,8 @@ public class DirtyData<T> {
private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
+ private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
+
private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
private static final DateTimeFormatter DATE_TIME_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@@ -41,7 +45,7 @@ public class DirtyData<T> {
* The identifier of dirty data, it will be used for filename generation
of file dirty sink,
* topic generation of mq dirty sink, tablename generation of database,
etc,
* and it supports variable replace like '${variable}'.
- * There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently
supported,
+ * There are several system
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,
* and the support of other variables is determined by the connector.
*/
private final String identifier;
@@ -58,18 +62,31 @@ public class DirtyData<T> {
* Dirty type
*/
private final DirtyType dirtyType;
+ /**
+ * Dirty describe message, it is the cause of dirty data
+ */
+ private final String dirtyMessage;
+ /**
+ * The row type of data, it is only used for 'RowData'
+ */
+ private @Nullable final LogicalType rowType;
/**
* The real dirty data
*/
private final T data;
- public DirtyData(T data, String identifier, String labels, String logTag,
DirtyType dirtyType) {
+ public DirtyData(T data, String identifier, String labels,
+ String logTag, DirtyType dirtyType, String dirtyMessage,
+ @Nullable LogicalType rowType) {
this.data = data;
this.dirtyType = dirtyType;
+ this.dirtyMessage = dirtyMessage;
+ this.rowType = rowType;
Map<String, String> paramMap = genParamMap();
this.labels = PatternReplaceUtils.replace(labels, paramMap);
this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
this.identifier = PatternReplaceUtils.replace(identifier, paramMap);
+
}
public static <T> Builder<T> builder() {
@@ -80,6 +97,7 @@ public class DirtyData<T> {
Map<String, String> paramMap = new HashMap<>();
paramMap.put(SYSTEM_TIME_KEY,
DATE_TIME_FORMAT.format(LocalDateTime.now()));
paramMap.put(DIRTY_TYPE_KEY, dirtyType.format());
+ paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
return paramMap;
}
@@ -103,12 +121,19 @@ public class DirtyData<T> {
return identifier;
}
+ @Nullable
+ public LogicalType getRowType() {
+ return rowType;
+ }
+
public static class Builder<T> {
private String identifier;
private String labels;
private String logTag;
private DirtyType dirtyType = DirtyType.UNDEFINED;
+ private String dirtyMessage;
+ private LogicalType rowType;
private T data;
public Builder<T> setDirtyType(DirtyType dirtyType) {
@@ -136,8 +161,18 @@ public class DirtyData<T> {
return this;
}
+ public Builder<T> setDirtyMessage(String dirtyMessage) {
+ this.dirtyMessage = dirtyMessage;
+ return this;
+ }
+
+ public Builder<T> setRowType(LogicalType rowType) {
+ this.rowType = rowType;
+ return this;
+ }
+
public DirtyData<T> build() {
- return new DirtyData<>(data, identifier, labels, logTag,
dirtyType);
+ return new DirtyData<>(data, identifier, labels, logTag,
dirtyType, dirtyMessage, rowType);
}
}
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
index afe3e0d17..0637725c3 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
@@ -76,6 +76,18 @@ public enum DirtyType {
* Json process error
*/
JSON_PROCESS_ERROR("JsonProcessError"),
+ /**
+ * Table identifier parse error
+ */
+ TABLE_IDENTIFIER_PARSE_ERROR("TableIdentifierParseError"),
+ /**
+ * Extract schema error
+ */
+ EXTRACT_SCHEMA_ERROR("ExtractSchemaError"),
+ /**
+ * Extract RowData error
+ */
+ EXTRACT_ROWDATA_ERROR("ExtractRowDataError"),
;
private final String format;
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
index a57c981fe..b7f7af914 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
-import static org.apache.flink.table.data.RowData.createFieldGetter;
/**
* Log dirty sink that is used to print log
@@ -48,7 +47,7 @@ public class LogDirtySink<T> implements DirtySink<T> {
private static final Logger LOGGER =
LoggerFactory.getLogger(LogDirtySink.class);
- private final RowData.FieldGetter[] fieldGetters;
+ private RowData.FieldGetter[] fieldGetters;
private final String format;
private final String fieldDelimiter;
private final DataType physicalRowDataType;
@@ -58,18 +57,13 @@ public class LogDirtySink<T> implements DirtySink<T> {
this.format = format;
this.fieldDelimiter = fieldDelimiter;
this.physicalRowDataType = physicalRowDataType;
- final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
-
.stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
- this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
- for (int i = 0; i < logicalTypes.length; i++) {
- fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
- }
}
@Override
public void open(Configuration configuration) throws Exception {
converter = new RowDataToJsonConverters(TimestampFormat.SQL,
MapNullKeyMode.DROP, null)
.createConverter(physicalRowDataType.getLogicalType());
+ fieldGetters =
FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());
}
@Override
@@ -78,7 +72,7 @@ public class LogDirtySink<T> implements DirtySink<T> {
Map<String, String> labelMap =
LabelUtils.parseLabels(dirtyData.getLabels());
T data = dirtyData.getData();
if (data instanceof RowData) {
- value = format((RowData) data, labelMap);
+ value = format((RowData) data, dirtyData.getRowType(), labelMap);
} else if (data instanceof JsonNode) {
value = format((JsonNode) data, labelMap);
} else {
@@ -88,14 +82,23 @@ public class LogDirtySink<T> implements DirtySink<T> {
LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
}
- private String format(RowData data, Map<String, String> labels) throws
JsonProcessingException {
+ private String format(RowData data, LogicalType rowType,
+ Map<String, String> labels) throws JsonProcessingException {
String value;
switch (format) {
case "csv":
- value = FormatUtils.csvFormat(data, fieldGetters, labels,
fieldDelimiter);
+ RowData.FieldGetter[] getters = fieldGetters;
+ if (rowType != null) {
+ getters = FormatUtils.parseFieldGetters(rowType);
+ }
+ value = FormatUtils.csvFormat(data, getters, labels,
fieldDelimiter);
break;
case "json":
- value = FormatUtils.jsonFormat(data, converter, labels);
+ RowDataToJsonConverter jsonConverter = converter;
+ if (rowType != null) {
+ jsonConverter =
FormatUtils.parseRowDataToJsonConverter(rowType);
+ }
+ value = FormatUtils.jsonFormat(data, jsonConverter, labels);
break;
default:
throw new UnsupportedOperationException(
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
index 3cb20c7b8..ab8fc9464 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
@@ -24,9 +24,6 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
-import org.apache.flink.formats.json.RowDataToJsonConverters;
import
org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -53,7 +50,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.flink.table.data.RowData.createFieldGetter;
/**
* S3 dirty sink that is used to sink dirty data to s3
@@ -72,7 +68,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
private final AtomicLong writeOutNum = new AtomicLong(0);
private final AtomicLong errorNum = new AtomicLong(0);
private final DataType physicalRowDataType;
- private final RowData.FieldGetter[] fieldGetters;
+ private RowData.FieldGetter[] fieldGetters;
private RowDataToJsonConverter converter;
private long batchBytes = 0L;
private int size;
@@ -85,18 +81,12 @@ public class S3DirtySink<T> implements DirtySink<T> {
public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
this.s3Options = s3Options;
this.physicalRowDataType = physicalRowDataType;
- final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
-
.stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
- this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
- for (int i = 0; i < logicalTypes.length; i++) {
- fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
- }
}
@Override
public void open(Configuration configuration) throws Exception {
- converter = new RowDataToJsonConverters(TimestampFormat.SQL,
MapNullKeyMode.DROP, null)
- .createConverter(physicalRowDataType.getLogicalType());
+ converter =
FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType());
+ fieldGetters =
FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());
AmazonS3 s3Client;
if (s3Options.getAccessKeyId() != null && s3Options.getSecretKeyId()
!= null) {
BasicAWSCredentials awsCreds =
@@ -149,7 +139,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
Map<String, String> labelMap =
LabelUtils.parseLabels(dirtyData.getLabels());
T data = dirtyData.getData();
if (data instanceof RowData) {
- value = format((RowData) data, labelMap);
+ value = format((RowData) data, dirtyData.getRowType(), labelMap);
} else if (data instanceof JsonNode) {
value = format((JsonNode) data, labelMap);
} else {
@@ -164,14 +154,23 @@ public class S3DirtySink<T> implements DirtySink<T> {
batchMap.computeIfAbsent(dirtyData.getIdentifier(), k -> new
ArrayList<>()).add(value);
}
- private String format(RowData data, Map<String, String> labels) throws
JsonProcessingException {
+ private String format(RowData data, LogicalType rowType,
+ Map<String, String> labels) throws JsonProcessingException {
String value;
switch (s3Options.getFormat()) {
case "csv":
- value = FormatUtils.csvFormat(data, fieldGetters, labels,
s3Options.getFieldDelimiter());
+ RowData.FieldGetter[] getters = fieldGetters;
+ if (rowType != null) {
+ getters = FormatUtils.parseFieldGetters(rowType);
+ }
+ value = FormatUtils.csvFormat(data, getters, labels,
s3Options.getFieldDelimiter());
break;
case "json":
- value = FormatUtils.jsonFormat(data, converter, labels);
+ RowDataToJsonConverter jsonConverter = converter;
+ if (rowType != null) {
+ jsonConverter =
FormatUtils.parseRowDataToJsonConverter(rowType);
+ }
+ value = FormatUtils.jsonFormat(data, jsonConverter, labels);
break;
default:
throw new UnsupportedOperationException(
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
index 3220837dd..58260bae1 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
@@ -17,18 +17,24 @@
package org.apache.inlong.sort.base.dirty.utils;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
import
org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringJoiner;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
/**
* Format utils
@@ -49,6 +55,30 @@ public final class FormatUtils {
private FormatUtils() {
}
+ /**
+ * Parse FieldGetter from LogicalType
+ * @param rowType The row type
+ * @return A array of FieldGetter
+ */
+ public static RowData.FieldGetter[] parseFieldGetters(LogicalType rowType)
{
+ List<LogicalType> logicalTypes = rowType.getChildren();
+ RowData.FieldGetter[] fieldGetters = new
RowData.FieldGetter[logicalTypes.size()];
+ for (int i = 0; i < logicalTypes.size(); i++) {
+ fieldGetters[i] = createFieldGetter(logicalTypes.get(i), i);
+ }
+ return fieldGetters;
+ }
+
+ /**
+ * Parse RowDataToJsonConverter
+ * @param rowType The row type
+ * @return RowDataToJsonConverter
+ */
+ public static RowDataToJsonConverter
parseRowDataToJsonConverter(LogicalType rowType) {
+ return new RowDataToJsonConverters(TimestampFormat.SQL,
MapNullKeyMode.DROP, null)
+ .createConverter(rowType);
+ }
+
/**
* Csv format for 'RowData'
*
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 4e3cfdf9b..d37d5dd28 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -149,7 +149,6 @@ public class MultipleSinkOption implements Serializable {
LOG.warn("Ignore table {} schema change: {}.", tableName,
tableChange);
return false;
}
-
throw new UnsupportedOperationException(
String.format("Unsupported table %s schema change: %s.",
tableName, tableChange));
}
diff --git
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
index 386766ddd..80363ba8b 100644
---
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
+++
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.RowData.FieldGetter;
import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
import org.junit.Assert;
@@ -41,7 +40,6 @@ import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.flink.table.data.RowData.createFieldGetter;
/**
@@ -64,7 +62,7 @@ public class FormatUtilsTest {
Column.physical("name", DataTypes.STRING()),
Column.physical("age", DataTypes.INT()));
List<LogicalType> logicalTypes = schema.toPhysicalRowDataType()
-
.getChildren().stream().map(DataType::getLogicalType).collect(Collectors.toList());
+ .getLogicalType().getChildren();
fieldGetters = new RowData.FieldGetter[logicalTypes.size()];
for (int i = 0; i < logicalTypes.size(); i++) {
fieldGetters[i] = createFieldGetter(logicalTypes.get(i), i);
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 11ed1113f..8553ce94a 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -45,6 +45,9 @@ import org.apache.iceberg.flink.IcebergTableSource;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
import java.util.Map;
import java.util.Set;
@@ -224,12 +227,15 @@ public class FlinkDynamicTableFactory implements
DynamicTableSinkFactory, Dynami
Map<String, String> tableProps = catalogTable.getOptions();
TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
ActionsProvider actionsLoader =
createActionLoader(context.getClassLoader(), tableProps);
-
+ // Build the dirty data side-output
+ final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(Configuration.fromMap(tableProps));
+ final DirtySink<Object> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
boolean multipleSink = Boolean.parseBoolean(
tableProps.getOrDefault(SINK_MULTIPLE_ENABLE.key(),
SINK_MULTIPLE_ENABLE.defaultValue().toString()));
if (multipleSink) {
CatalogLoader catalogLoader = createCatalogLoader(tableProps);
- return new IcebergTableSink(null, tableSchema, catalogTable,
catalogLoader, actionsLoader);
+ return new IcebergTableSink(null, tableSchema, catalogTable,
+ catalogLoader, actionsLoader, dirtyOptions, dirtySink);
} else {
TableLoader tableLoader;
if (catalog != null) {
@@ -238,7 +244,8 @@ public class FlinkDynamicTableFactory implements
DynamicTableSinkFactory, Dynami
tableLoader = createTableLoader(catalogTable, tableProps,
objectPath.getDatabaseName(),
objectPath.getObjectName());
}
- return new IcebergTableSink(tableLoader, tableSchema,
catalogTable, null, actionsLoader);
+ return new IcebergTableSink(tableLoader, tableSchema, catalogTable,
+ null, actionsLoader, dirtyOptions, dirtySink);
}
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index bd65c76e9..31698c0de 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -35,11 +35,14 @@ import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
import org.apache.inlong.sort.iceberg.sink.FlinkSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@@ -74,6 +77,9 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
private boolean overwrite = false;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<Object> dirtySink;
+
private IcebergTableSink(IcebergTableSink toCopy) {
this.tableLoader = toCopy.tableLoader;
this.tableSchema = toCopy.tableSchema;
@@ -81,18 +87,24 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
this.catalogTable = toCopy.catalogTable;
this.catalogLoader = toCopy.catalogLoader;
this.actionsProvider = toCopy.actionsProvider;
+ this.dirtyOptions = toCopy.dirtyOptions;
+ this.dirtySink = toCopy.dirtySink;
}
public IcebergTableSink(TableLoader tableLoader,
TableSchema tableSchema,
CatalogTable catalogTable,
CatalogLoader catalogLoader,
- ActionsProvider actionsProvider) {
+ ActionsProvider actionsProvider,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
this.tableLoader = tableLoader;
this.tableSchema = tableSchema;
this.catalogTable = catalogTable;
this.catalogLoader = catalogLoader;
this.actionsProvider = actionsProvider;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
@Override
@@ -130,6 +142,8 @@ public class IcebergTableSink implements DynamicTableSink,
SupportsPartitioning,
.overwrite(overwrite)
.appendMode(tableOptions.get(IGNORE_ALL_CHANGELOG))
.metric(tableOptions.get(INLONG_METRIC),
tableOptions.get(INLONG_AUDIT))
+ .dirtyOptions(dirtyOptions)
+ .dirtySink(dirtySink)
.action(actionsProvider)
.append();
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index dd1c43275..a32db160a 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -52,6 +52,8 @@ import
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
import
org.apache.inlong.sort.iceberg.sink.multiple.IcebergMultipleFilesCommiter;
import
org.apache.inlong.sort.iceberg.sink.multiple.IcebergMultipleStreamWriter;
@@ -64,6 +66,7 @@ import
org.apache.inlong.sort.iceberg.sink.multiple.DynamicSchemaHandleOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
@@ -166,6 +169,8 @@ public class FlinkSink {
private CatalogLoader catalogLoader = null;
private boolean multipleSink = false;
private MultipleSinkOption multipleSinkOption = null;
+ private DirtyOptions dirtyOptions;
+ private @Nullable DirtySink<Object> dirtySink;
private Builder() {
}
@@ -283,6 +288,16 @@ public class FlinkSink {
return this;
}
+ public Builder dirtyOptions(DirtyOptions dirtyOptions) {
+ this.dirtyOptions = dirtyOptions;
+ return this;
+ }
+
+ public Builder dirtySink(DirtySink<Object> dirtySink) {
+ this.dirtySink = dirtySink;
+ return this;
+ }
+
/**
* Configure the write {@link DistributionMode} that the flink sink
will use. Currently, flink support
* {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
@@ -514,7 +529,8 @@ public class FlinkSink {
}
IcebergProcessOperator<RowData, WriteResult> streamWriter =
createStreamWriter(
- table, flinkRowType, equalityFieldIds, upsertMode,
appendMode, inlongMetric, auditHostAndPorts);
+ table, flinkRowType, equalityFieldIds, upsertMode,
appendMode, inlongMetric,
+ auditHostAndPorts, dirtyOptions, dirtySink);
int parallelism = writeParallelism == null ?
input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -534,8 +550,7 @@ public class FlinkSink {
int parallelism = writeParallelism == null ?
input.getParallelism() : writeParallelism;
DynamicSchemaHandleOperator routeOperator = new
DynamicSchemaHandleOperator(
- catalogLoader,
- multipleSinkOption);
+ catalogLoader, multipleSinkOption, dirtyOptions,
dirtySink);
SingleOutputStreamOperator<RecordWithSchema> routeStream = input
.transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
TypeInformation.of(RecordWithSchema.class),
@@ -544,7 +559,8 @@ public class FlinkSink {
IcebergProcessOperator streamWriter =
new IcebergProcessOperator(new IcebergMultipleStreamWriter(
- appendMode, catalogLoader, inlongMetric,
auditHostAndPorts, multipleSinkOption));
+ appendMode, catalogLoader, inlongMetric,
auditHostAndPorts,
+ multipleSinkOption, dirtyOptions, dirtySink));
SingleOutputStreamOperator<MultipleWriteResult> writerStream =
routeStream
.transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME),
TypeInformation.of(IcebergProcessOperator.class),
@@ -618,7 +634,10 @@ public class FlinkSink {
boolean upsert,
boolean appendMode,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
+ // flink A, iceberg a
Preconditions.checkArgument(table != null, "Iceberg table should't be
null");
Map<String, String> props = table.properties();
long targetFileSize = getTargetFileSizeBytes(props);
@@ -628,9 +647,11 @@ public class FlinkSink {
TaskWriterFactory<RowData> taskWriterFactory = new
RowDataTaskWriterFactory(
serializableTable, serializableTable.schema(), flinkRowType,
targetFileSize,
fileFormat, equalityFieldIds, upsert, appendMode);
-
+ // Set null for flinkRowType of IcebergSingleStreamWriter
+ // to avoid frequent Field.Getter creation in dirty data sink.
return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
- table.name(), taskWriterFactory, inlongMetric,
auditHostAndPorts));
+ table.name(), taskWriterFactory, inlongMetric,
auditHostAndPorts,
+ null, dirtyOptions, dirtySink));
}
private static FileFormat getFileFormat(Map<String, String> properties) {
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index f8422add6..571221257 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -19,7 +19,6 @@
package org.apache.inlong.sort.iceberg.sink.multiple;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -39,6 +38,10 @@ import org.apache.iceberg.flink.CatalogLoader;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.base.sink.MultipleSinkOption;
@@ -47,6 +50,7 @@ import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
@@ -63,10 +67,10 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
OneInputStreamOperator<RowData, RecordWithSchema>,
ProcessingTimeCallback {
- private static final Logger LOG =
LoggerFactory.getLogger(DynamicSchemaHandleOperator.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DynamicSchemaHandleOperator.class);
private static final long HELPER_DEBUG_INTERVEL = 10 * 60 * 1000;
+ private static final long serialVersionUID = 1L;
- private final ObjectMapper objectMapper = new ObjectMapper();
private final CatalogLoader catalogLoader;
private final MultipleSinkOption multipleSinkOption;
@@ -84,18 +88,26 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
// blacklist to filter schema update failed table
private transient Set<TableIdentifier> blacklist;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<Object> dirtySink;
+
public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
- MultipleSinkOption multipleSinkOption) {
+ MultipleSinkOption multipleSinkOption, DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
this.catalogLoader = catalogLoader;
this.multipleSinkOption = multipleSinkOption;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
+ @SuppressWarnings("unchecked")
@Override
public void open() throws Exception {
super.open();
this.catalog = catalogLoader.loadCatalog();
this.asNamespaceCatalog =
catalog instanceof SupportsNamespaces ? (SupportsNamespaces)
catalog : null;
+
this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(
multipleSinkOption.getFormat(),
multipleSinkOption.getFormatOption());
@@ -118,13 +130,25 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
@Override
public void processElement(StreamRecord<RowData> element) throws Exception
{
- JsonNode jsonNode =
dynamicSchemaFormat.deserialize(element.getValue().getBinary(0));
-
- TableIdentifier tableId = parseId(jsonNode);
+ JsonNode jsonNode = null;
+ try {
+ jsonNode =
dynamicSchemaFormat.deserialize(element.getValue().getBinary(0));
+ } catch (Exception e) {
+ LOGGER.error(String.format("Deserialize error, raw data: %s",
+ new String(element.getValue().getBinary(0))), e);
+ handleDirtyData(new String(element.getValue().getBinary(0)),
+ null, DirtyType.DESERIALIZE_ERROR, e);
+ }
+ TableIdentifier tableId = null;
+ try {
+ tableId = parseId(jsonNode);
+ } catch (Exception e) {
+ LOGGER.error(String.format("Table identifier parse error, raw
data: %s", jsonNode), e);
+ handleDirtyData(jsonNode, jsonNode,
DirtyType.TABLE_IDENTIFIER_PARSE_ERROR, e);
+ }
if (blacklist.contains(tableId)) {
return;
}
-
boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
if (isDDL) {
execDDL(jsonNode, tableId);
@@ -133,6 +157,41 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
}
}
+ private void handleDirtyData(Object dirtyData, JsonNode rootNode,
DirtyType dirtyType, Exception e) {
+ if (!dirtyOptions.ignoreDirty()) {
+ RuntimeException ex;
+ if (e instanceof RuntimeException) {
+ ex = (RuntimeException) e;
+ } else {
+ ex = new RuntimeException(e);
+ }
+ throw ex;
+ }
+ if (dirtySink != null) {
+ DirtyData.Builder<Object> builder = DirtyData.builder();
+ try {
+ builder.setData(dirtyData)
+ .setDirtyType(dirtyType)
+ .setDirtyMessage(e.getMessage());
+ if (rootNode != null) {
+ builder.setLabels(dynamicSchemaFormat.parse(rootNode,
dirtyOptions.getLabels()))
+ .setLogTag(dynamicSchemaFormat.parse(rootNode,
dirtyOptions.getLogTag()))
+ .setIdentifier(dynamicSchemaFormat.parse(rootNode,
dirtyOptions.getIdentifier()));
+ } else {
+ builder.setLabels(dirtyOptions.getLabels())
+ .setLogTag(dirtyOptions.getLogTag())
+ .setIdentifier(dirtyOptions.getIdentifier());
+ }
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOG.warn("Dirty sink failed", ex);
+ }
+ }
+ }
+
@Override
public void onProcessingTime(long timestamp) {
LOG.info("Black list table: {} at time {}.", blacklist, timestamp);
@@ -146,6 +205,9 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
private void execDML(JsonNode jsonNode, TableIdentifier tableId) {
RecordWithSchema record = parseRecord(jsonNode, tableId);
+ if (record == null) {
+ return;
+ }
Schema schema = schemaCache.get(record.getTableId());
Schema dataSchema = record.getSchema();
recordQueues.compute(record.getTableId(), (k, v) -> {
@@ -155,7 +217,6 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
v.add(record);
return v;
});
-
if (schema == null) {
handleTableCreateEventFromOperator(record.getTableId(),
dataSchema);
} else {
@@ -182,6 +243,7 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
LOG.warn("Ignore table {} schema change, old:
{} new: {}.",
tableId, dataSchema, latestSchema, e);
blacklist.add(tableId);
+ handleDirtyData(jsonNode, jsonNode,
DirtyType.EXTRACT_ROWDATA_ERROR, e);
}
return Collections.emptyList();
});
@@ -204,13 +266,11 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
LOG.warn("Database({}) already exist in Catalog({})!",
tableId.namespace(), catalog.name());
}
}
-
ImmutableMap.Builder<String, String> properties =
ImmutableMap.builder();
properties.put("format-version", "2");
properties.put("write.upsert.enabled", "true");
// for hive visible
properties.put("engine.hive.enabled", "true");
-
try {
catalog.createTable(tableId, schema,
PartitionSpec.unpartitioned(), properties.build());
LOG.info("Auto create Table({}) in Database({}) in
Catalog({})!",
@@ -220,13 +280,11 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
tableId.name(), tableId.namespace(), catalog.name());
}
}
-
handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema());
}
private void handldAlterSchemaEventFromOperator(TableIdentifier tableId,
Schema oldSchema, Schema newSchema) {
Table table = catalog.loadTable(tableId);
-
// The transactionality of changes is guaranteed by comparing the old
schema with the current schema of the
// table.
// Judging whether changes can be made by schema comparison (currently
only column additions are supported),
@@ -263,15 +321,18 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
}
private RecordWithSchema parseRecord(JsonNode data, TableIdentifier
tableId) {
- List<String> pkListStr =
dynamicSchemaFormat.extractPrimaryKeyNames(data);
- RowType schema = dynamicSchemaFormat.extractSchema(data, pkListStr);
-
- RecordWithSchema record = new RecordWithSchema(
- data,
- FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(schema)),
- tableId,
- pkListStr);
- return record;
+ try {
+ List<String> pkListStr =
dynamicSchemaFormat.extractPrimaryKeyNames(data);
+ RowType schema = dynamicSchemaFormat.extractSchema(data,
pkListStr);
+ return new RecordWithSchema(
+ data,
+ FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(schema)),
+ tableId,
+ pkListStr);
+ } catch (Exception e) {
+ handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e);
+ }
+ return null;
}
private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId,
List<TableChange> tableChanges) {
@@ -290,7 +351,6 @@ public class DynamicSchemaHandleOperator extends
AbstractStreamOperator<RecordWi
break;
}
}
-
return canHandle;
}
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 16bedfd83..9246ffeed 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -38,6 +39,8 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
@@ -95,18 +98,24 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
private transient SinkMetricData metricData;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<Object> dirtySink;
public IcebergMultipleStreamWriter(
boolean appendMode,
CatalogLoader catalogLoader,
String inlongMetric,
String auditHostAndPorts,
- MultipleSinkOption multipleSinkOption) {
+ MultipleSinkOption multipleSinkOption,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
this.appendMode = appendMode;
this.catalogLoader = catalogLoader;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
this.multipleSinkOption = multipleSinkOption;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
@Override
@@ -182,11 +191,11 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
.map(NestedField::fieldId)
.collect(Collectors.toList());
}
-
+ RowType flinkRowType =
FlinkSchemaUtil.convert(recordWithSchema.getSchema());
TaskWriterFactory<RowData> taskWriterFactory = new
RowDataTaskWriterFactory(
table,
recordWithSchema.getSchema(),
- FlinkSchemaUtil.convert(recordWithSchema.getSchema()),
+ flinkRowType,
targetFileSizeBytes,
fileFormat,
equalityFieldIds,
@@ -195,7 +204,8 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
if (multipleWriters.get(tableId) == null) {
IcebergSingleStreamWriter<RowData> writer = new
IcebergSingleStreamWriter<>(
- tableId.toString(), taskWriterFactory, null, null);
+ tableId.toString(), taskWriterFactory, null,
+ null, flinkRowType, dirtyOptions, dirtySink);
writer.setup(getRuntimeContext(),
new CallbackCollector<>(
writeResult -> collector.collect(new
MultipleWriteResult(tableId, writeResult))),
@@ -206,6 +216,7 @@ public class IcebergMultipleStreamWriter extends
IcebergProcessFunction<RecordWi
} else { // only if second times schema will evolute
// Refresh new schema maybe cause previous file writer
interrupted, so here should handle it
multipleWriters.get(tableId).schemaEvolution(taskWriterFactory);
+ multipleWriters.get(tableId).setFlinkRowType(flinkRowType);
}
}
diff --git
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index 8d936ffd8..761306a6e 100644
---
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
+++
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -26,15 +26,21 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.flink.sink.TaskWriterFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
@@ -48,6 +54,8 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
CheckpointedFunction,
SchemaEvolutionFunction<TaskWriterFactory<T>> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IcebergSingleStreamWriter.class);
+
private static final long serialVersionUID = 1L;
private final String fullTableName;
@@ -58,20 +66,28 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
private transient TaskWriter<T> writer;
private transient int subTaskId;
private transient int attemptId;
- @Nullable
- private transient SinkMetricData metricData;
+ private @Nullable transient SinkMetricData metricData;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
+ private @Nullable RowType flinkRowType;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<Object> dirtySink;
public IcebergSingleStreamWriter(
String fullTableName,
TaskWriterFactory<T> taskWriterFactory,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ @Nullable RowType flinkRowType,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
this.fullTableName = fullTableName;
this.taskWriterFactory = taskWriterFactory;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
+ this.flinkRowType = flinkRowType;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
@Override
@@ -81,7 +97,6 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
// Initialize the task writer factory.
this.taskWriterFactory.initialize(subTaskId, attemptId);
-
// Initialize the task writer.
this.writer = taskWriterFactory.create();
@@ -102,17 +117,38 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
// close all open files and emit files to downstream committer operator
emit(writer.complete());
-
this.writer = taskWriterFactory.create();
}
@Override
- public void processElement(T value)
- throws Exception {
- writer.write(value);
-
+ public void processElement(T value) throws Exception {
+ try {
+ writer.write(value);
+ } catch (Exception e) {
+ LOGGER.error(String.format("write error, raw data: %s", value), e);
+ if (!dirtyOptions.ignoreDirty()) {
+ throw e;
+ }
+ if (dirtySink != null) {
+ DirtyData.Builder<Object> builder = DirtyData.builder();
+ try {
+ builder.setData(value)
+ .setLabels(dirtyOptions.getLabels())
+ .setLogTag(dirtyOptions.getLogTag())
+ .setIdentifier(dirtyOptions.getIdentifier())
+ .setRowType(flinkRowType)
+ .setDirtyMessage(e.getMessage());
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOGGER.warn("Dirty sink failed", ex);
+ }
+ }
+ }
if (metricData != null) {
- metricData.invokeWithEstimate(value);
+ metricData.invokeWithEstimate(value == null ? "" : value);
}
}
@@ -131,6 +167,10 @@ public class IcebergSingleStreamWriter<T> extends
IcebergProcessFunction<T, Writ
}
}
+ public void setFlinkRowType(@Nullable RowType flinkRowType) {
+ this.flinkRowType = flinkRowType;
+ }
+
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
if (metricData != null && metricStateListState != null) {
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
index 50388dcb4..0602868f2 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
@@ -44,6 +44,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -116,7 +117,13 @@ public class IcebergNodeSqlParserTest extends
AbstractTestBase {
new FieldInfo("age", new IntFormatInfo())),
new FieldRelation(new FieldInfo("ts", new
TimestampFormatInfo()),
new FieldInfo("ts", new
TimestampFormatInfo())));
-
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=inlong_iceberg");
// set HIVE_CONF_DIR,or set uri and warehouse
return new IcebergLoadNode(
"iceberg",
@@ -126,7 +133,7 @@ public class IcebergNodeSqlParserTest extends
AbstractTestBase {
null,
null,
null,
- null,
+ properties,
"inlong",
"inlong_iceberg",
null,