This is an automated email from the ASF dual-hosted git repository.
yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 09bae48fe [INLONG-6671][Sort] Supports dirty data side-output for
kafka connector (#6688)
09bae48fe is described below
commit 09bae48fe06662bc8cd795a9272573a7ccd9135a
Author: yunqingmoswu <[email protected]>
AuthorDate: Thu Dec 1 14:47:54 2022 +0800
[INLONG-6671][Sort] Supports dirty data side-output for kafka connector
(#6688)
---
.../base/dirty/sink/log/LogDirtySinkFactory.java | 9 +-
.../base/dirty/sink/s3/S3DirtySinkFactory.java | 8 +-
.../base/dirty/utils/DirtySinkFactoryUtils.java | 48 ++++++
.../kafka/DynamicKafkaSerializationSchema.java | 161 ++++++++++++++++-----
.../apache/inlong/sort/kafka/KafkaDynamicSink.java | 18 ++-
.../table/DynamicKafkaDeserializationSchema.java | 59 +++++++-
.../sort/kafka/table/KafkaDynamicSource.java | 22 ++-
.../sort/kafka/table/KafkaDynamicTableFactory.java | 49 +++++--
.../table/UpsertKafkaDynamicTableFactory.java | 25 +++-
.../inlong/sort/parser/KafkaLoadSqlParseTest.java | 150 +++++++++++++++++++
10 files changed, 481 insertions(+), 68 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
index c3720a93d..b07b581f1 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
@@ -18,6 +18,8 @@
package org.apache.inlong.sort.base.dirty.sink.log;
import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.factories.DynamicTableFactory.Context;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
@@ -38,9 +40,10 @@ public class LogDirtySinkFactory implements DirtySinkFactory
{
@Override
public <T> DirtySink<T> createDirtySink(Context context) {
- FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
- String format =
context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FORMAT);
- String fieldDelimiter =
context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+ ReadableConfig config =
Configuration.fromMap(context.getCatalogTable().getOptions());
+ FactoryUtil.validateFactoryOptions(this, config);
+ String format = config.get(DIRTY_SIDE_OUTPUT_FORMAT);
+ String fieldDelimiter = config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
return new LogDirtySink<>(format, fieldDelimiter,
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
index d9ec26434..16310926c 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.base.dirty.sink.s3;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.factories.DynamicTableFactory.Context;
@@ -79,9 +80,10 @@ public class S3DirtySinkFactory implements DirtySinkFactory {
@Override
public <T> DirtySink<T> createDirtySink(Context context) {
- FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
- validate(context.getConfiguration());
- return new S3DirtySink<>(getS3Options(context.getConfiguration()),
+ ReadableConfig config =
Configuration.fromMap(context.getCatalogTable().getOptions());
+ FactoryUtil.validateFactoryOptions(this, config);
+ validate(config);
+ return new S3DirtySink<>(getS3Options(config),
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/DirtySinkFactoryUtils.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/DirtySinkFactoryUtils.java
new file mode 100644
index 000000000..8f5fc9ff4
--- /dev/null
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/DirtySinkFactoryUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+
+/**
+ * Dirty sink facotry utils, it helps to create dirty sink
+ */
+public final class DirtySinkFactoryUtils {
+
+ private DirtySinkFactoryUtils() {
+ }
+
+ public static <T> DirtySink<T> createDirtySink(Context context,
DirtyOptions dirtyOptions) {
+ if (dirtyOptions == null) {
+ dirtyOptions =
DirtyOptions.fromConfig(Configuration.fromMap(context.getCatalogTable().getOptions()));
+ }
+ dirtyOptions.validate();
+ DirtySink<T> dirtySink = null;
+ if (dirtyOptions.ignoreDirty() &&
dirtyOptions.enableDirtySideOutput()) {
+ DirtySinkFactory dirtySinkFactory =
FactoryUtil.discoverFactory(context.getClassLoader(),
+ DirtySinkFactory.class, dirtyOptions.getDirtyConnector());
+ dirtySink = dirtySinkFactory.createDirtySink(context);
+ }
+ return dirtySink;
+ }
+}
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
index be9c48480..6b0b733a9 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.kafka;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
@@ -28,6 +29,10 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.formats.raw.RawFormatSerializationSchema;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
import org.apache.inlong.sort.kafka.KafkaDynamicSink.WritableMetadata;
@@ -36,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
@@ -80,6 +84,8 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
private final String sinkMultipleFormat;
private boolean multipleSink;
private JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
+ private final DirtyOptions dirtyOptions;
+ private final @Nullable DirtySink<Object> dirtySink;
private int[] partitions;
private int parallelInstanceId;
@@ -97,7 +103,9 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
int[] metadataPositions,
boolean upsertMode,
@Nullable String sinkMultipleFormat,
- @Nullable String topicPattern) {
+ @Nullable String topicPattern,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
if (upsertMode) {
Preconditions.checkArgument(
keySerialization != null && keyFieldGetters.length > 0,
@@ -114,6 +122,8 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
this.upsertMode = upsertMode;
this.sinkMultipleFormat = sinkMultipleFormat;
this.topicPattern = topicPattern;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
static RowData createProjectedRow(
@@ -135,9 +145,12 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
if (partitioner != null) {
partitioner.open(parallelInstanceId, numParallelInstances);
}
+ if (dirtySink != null) {
+ dirtySink.open(new Configuration());
+ }
// Only support dynamic topic when the topicPattern is specified
// and the valueSerialization is RawFormatSerializationSchema
- if (valueSerialization instanceof RawFormatSerializationSchema &&
StringUtils.isNotBlank(topicPattern)) {
+ if (valueSerialization instanceof RawFormatSerializationSchema &&
StringUtils.isNotBlank(sinkMultipleFormat)) {
multipleSink = true;
jsonDynamicSchemaFormat =
(JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
@@ -148,19 +161,24 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow,
@Nullable Long timestamp) {
// shortcut in case no input projection is required
if (keySerialization == null && !hasMetadata) {
- final byte[] valueSerialized =
valueSerialization.serialize(consumedRow);
- return new ProducerRecord<>(
- getTargetTopic(consumedRow),
- extractPartition(consumedRow, null, valueSerialized),
- null,
- valueSerialized);
+ final byte[] valueSerialized =
serializeWithDirtyHandle(consumedRow,
+ DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization);
+ if (valueSerialized != null) {
+ return new ProducerRecord<>(
+ getTargetTopic(consumedRow),
+ extractPartition(consumedRow, null, valueSerialized),
+ null,
+ valueSerialized);
+ }
}
final byte[] keySerialized;
+ boolean mayDirtyData = false;
if (keySerialization == null) {
keySerialized = null;
} else {
final RowData keyRow = createProjectedRow(consumedRow,
RowKind.INSERT, keyFieldGetters);
- keySerialized = keySerialization.serialize(keyRow);
+ keySerialized = serializeWithDirtyHandle(keyRow,
DirtyType.KEY_SERIALIZE_ERROR, keySerialization);
+ mayDirtyData = keySerialized == null;
}
final byte[] valueSerialized;
@@ -173,10 +191,16 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
} else {
// make the message to be INSERT to be compliant with the
INSERT-ONLY format
valueRow.setRowKind(RowKind.INSERT);
- valueSerialized = valueSerialization.serialize(valueRow);
+ valueSerialized = serializeWithDirtyHandle(valueRow,
+ DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization);
+ mayDirtyData = mayDirtyData || valueSerialized == null;
}
} else {
- valueSerialized = valueSerialization.serialize(valueRow);
+ valueSerialized = serializeWithDirtyHandle(valueRow,
DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization);
+ mayDirtyData = mayDirtyData || valueSerialized == null;
+ }
+ if (mayDirtyData) {
+ return null;
}
return new ProducerRecord<>(
getTargetTopic(consumedRow),
@@ -187,6 +211,67 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
readMetadata(consumedRow,
KafkaDynamicSink.WritableMetadata.HEADERS));
}
+ private byte[] serializeWithDirtyHandle(RowData consumedRow, DirtyType
dirtyType,
+ SerializationSchema<RowData> serialization) {
+ if (!dirtyOptions.ignoreDirty()) {
+ return serialization.serialize(consumedRow);
+ }
+ byte[] value = null;
+ try {
+ value = serialization.serialize(consumedRow);
+ } catch (Exception e) {
+ LOG.error(String.format("serialize error, raw data: %s",
consumedRow.toString()), e);
+ if (dirtySink != null) {
+ DirtyData.Builder<Object> builder = DirtyData.builder();
+ try {
+ builder.setData(consumedRow)
+ .setDirtyType(dirtyType)
+ .setLabels(dirtyOptions.getLabels())
+ .setLogTag(dirtyOptions.getLogTag())
+ .setIdentifier(dirtyOptions.getIdentifier());
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOG.warn("Dirty sink failed", ex);
+ }
+ }
+ }
+ return value;
+ }
+
+ private void serializeWithDirtyHandle(Map<String, Object> baseMap,
JsonNode rootNode,
+ JsonNode dataNode, List<ProducerRecord<byte[], byte[]>> values) {
+ try {
+ byte[] data =
jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
+ values.add(new ProducerRecord<>(
+ jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+ extractPartition(null, null, data), null, data));
+ } catch (Exception e) {
+ LOG.error(String.format("serialize error, raw data: %s", baseMap),
e);
+ if (!dirtyOptions.ignoreDirty()) {
+ throw new RuntimeException(e);
+ }
+ if (dirtySink != null) {
+ DirtyData.Builder<Object> builder = DirtyData.builder();
+ try {
+ builder.setData(dataNode)
+ .setDirtyType(DirtyType.VALUE_DESERIALIZE_ERROR)
+ .setLabels(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getLabels()))
+ .setLogTag(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getLogTag()))
+
.setIdentifier(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getIdentifier()));
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOG.warn("Dirty sink failed", ex);
+ }
+ }
+ }
+ }
+
/**
* Serialize for list it is used for multiple sink scenes when a record
contains mulitple real records.
*
@@ -195,10 +280,14 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
* @return List of ProducerRecord
*/
public List<ProducerRecord<byte[], byte[]>> serializeForList(RowData
consumedRow, @Nullable Long timestamp) {
+ List<ProducerRecord<byte[], byte[]>> values = new ArrayList<>();
if (!multipleSink) {
- return Collections.singletonList(serialize(consumedRow,
timestamp));
+ ProducerRecord<byte[], byte[]> value = serialize(consumedRow,
timestamp);
+ if (value != null) {
+ values.add(value);
+ }
+ return values;
}
- List<ProducerRecord<byte[], byte[]>> values = new ArrayList<>();
try {
JsonNode rootNode =
jsonDynamicSchemaFormat.deserialize(consumedRow.getBinary(0));
boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
@@ -220,9 +309,27 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
} else {
split2JsonArray(rootNode, updateBeforeNode, updateAfterNode,
values);
}
- } catch (IOException e) {
- LOG.warn("deserialize error", e);
- values.add(new ProducerRecord<>(topic, null, null,
consumedRow.getBinary(0)));
+ } catch (Exception e) {
+ LOG.error(String.format("serialize error, raw data: %s", new
String(consumedRow.getBinary(0))), e);
+ if (!dirtyOptions.ignoreDirty()) {
+ throw new RuntimeException(e);
+ }
+ if (dirtySink != null) {
+ DirtyData.Builder<Object> builder = DirtyData.builder();
+ try {
+ builder.setData(new String(consumedRow.getBinary(0)))
+ .setDirtyType(DirtyType.VALUE_DESERIALIZE_ERROR)
+ .setLabels(dirtyOptions.getLabels())
+ .setLogTag(dirtyOptions.getLogTag())
+ .setIdentifier(dirtyOptions.getIdentifier());
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOG.warn("Dirty sink failed", ex);
+ }
+ }
}
return values;
}
@@ -261,27 +368,13 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
} else if (updateBeforeKey != null) {
baseMap.remove(updateBeforeKey);
}
- try {
- byte[] data =
jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
- values.add(new ProducerRecord<>(
- jsonDynamicSchemaFormat.parse(rootNode,
topicPattern),
- extractPartition(null, null, data), null, data));
- } catch (Exception e) {
- throw new RuntimeException("serialize for list error", e);
- }
+ serializeWithDirtyHandle(baseMap, rootNode,
updateAfterNode.get(i), values);
}
} else {
// In general, it will not run to this branch
for (int i = 0; i < updateBeforeNode.size(); i++) {
baseMap.put(updateBeforeKey,
Collections.singletonList(updateBeforeNode.get(i)));
- try {
- byte[] data =
jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
- values.add(new ProducerRecord<>(
- jsonDynamicSchemaFormat.parse(rootNode,
topicPattern),
- extractPartition(null, null, data), null, data));
- } catch (Exception e) {
- throw new RuntimeException("serialize for list error", e);
- }
+ serializeWithDirtyHandle(baseMap, rootNode,
updateBeforeNode.get(i), values);
}
}
}
@@ -308,7 +401,7 @@ class DynamicKafkaSerializationSchema implements
KafkaSerializationSchema<RowDat
// Extract the index '0' as the raw data is determined by the
Raw format:
// The Raw format allows to read and write raw (byte based)
values as a single column
return jsonDynamicSchemaFormat.parse(element.getBinary(0),
topicPattern);
- } catch (IOException e) {
+ } catch (Exception e) {
// Ignore the parse error and it will return the default topic
final.
LOG.warn("parse dynamic topic error", e);
}
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
index acfa5d8f4..63a1593f9 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
@@ -37,6 +37,8 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import
org.apache.inlong.sort.kafka.DynamicKafkaSerializationSchema.MetadataConverter;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
@@ -145,6 +147,8 @@ public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetada
*/
private final String auditHostAndPorts;
private @Nullable final String sinkMultipleFormat;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<Object> dirtySink;
/**
* Metadata that is appended at the end of a physical sink row.
*/
@@ -176,7 +180,9 @@ public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetada
String inlongMetric,
String auditHostAndPorts,
@Nullable String sinkMultipleFormat,
- @Nullable String topicPattern) {
+ @Nullable String topicPattern,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
// Format attributes
this.consumedDataType =
checkNotNull(consumedDataType, "Consumed data type must not be
null.");
@@ -207,6 +213,8 @@ public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetada
this.auditHostAndPorts = auditHostAndPorts;
this.sinkMultipleFormat = sinkMultipleFormat;
this.topicPattern = topicPattern;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
@Override
@@ -310,7 +318,9 @@ public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetada
inlongMetric,
auditHostAndPorts,
sinkMultipleFormat,
- topicPattern);
+ topicPattern,
+ dirtyOptions,
+ dirtySink);
copy.metadataKeys = metadataKeys;
return copy;
}
@@ -422,7 +432,9 @@ public class KafkaDynamicSink implements DynamicTableSink,
SupportsWritingMetada
metadataPositions,
upsertMode,
sinkMultipleFormat,
- topicPattern);
+ topicPattern,
+ dirtyOptions,
+ dirtySink);
return new FlinkKafkaProducer<>(
topic,
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index dc218c900..5dd3fcaf7 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
import org.apache.flink.table.data.GenericRowData;
@@ -28,10 +29,17 @@ import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.SourceMetricData;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -43,6 +51,8 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicKafkaDeserializationSchema.class);
+
private final @Nullable DeserializationSchema<RowData> keyDeserialization;
private final DeserializationSchema<RowData> valueDeserialization;
@@ -57,6 +67,8 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
private final boolean upsertMode;
+ private final DirtyOptions dirtyOptions;
+ private final @Nullable DirtySink<String> dirtySink;
private SourceMetricData metricData;
DynamicKafkaDeserializationSchema(
@@ -68,7 +80,9 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
boolean hasMetadata,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
- boolean upsertMode) {
+ boolean upsertMode,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<String> dirtySink) {
if (upsertMode) {
Preconditions.checkArgument(
keyDeserialization != null && keyProjection.length > 0,
@@ -87,6 +101,8 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
upsertMode);
this.producedTypeInfo = producedTypeInfo;
this.upsertMode = upsertMode;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
public void setMetricData(SourceMetricData metricData) {
@@ -99,6 +115,9 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
keyDeserialization.open(context);
}
valueDeserialization.open(context);
+ if (dirtySink != null) {
+ dirtySink.open(new Configuration());
+ }
}
@Override
@@ -117,7 +136,8 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
if (keyDeserialization == null && !hasMetadata) {
- valueDeserialization.deserialize(record.value(), collector);
+ deserializeWithDirtyHandle(record.value(),
DirtyType.VALUE_DESERIALIZE_ERROR,
+ valueDeserialization, collector);
// output metrics
if (metricData != null) {
outputMetrics(record);
@@ -127,7 +147,8 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
// buffer key(s)
if (keyDeserialization != null) {
- keyDeserialization.deserialize(record.key(), keyCollector);
+ deserializeWithDirtyHandle(record.key(),
DirtyType.KEY_DESERIALIZE_ERROR,
+ keyDeserialization, keyCollector);
}
// project output while emitting values
@@ -139,7 +160,8 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
- valueDeserialization.deserialize(record.value(), outputCollector);
+ deserializeWithDirtyHandle(record.value(),
DirtyType.VALUE_DESERIALIZE_ERROR,
+ valueDeserialization, outputCollector);
// output metrics
if (metricData != null) {
outputMetrics(record);
@@ -149,6 +171,35 @@ public class DynamicKafkaDeserializationSchema implements
KafkaDeserializationSc
keyCollector.buffer.clear();
}
+ private void deserializeWithDirtyHandle(byte[] value, DirtyType dirtyType,
+ DeserializationSchema<RowData> deserialization, Collector<RowData>
collector) throws IOException {
+ if (!dirtyOptions.ignoreDirty()) {
+ deserialization.deserialize(value, collector);
+ } else {
+ try {
+ deserialization.deserialize(value, collector);
+ } catch (IOException e) {
+ LOG.error(String.format("deserialize error, raw data: %s", new
String(value)), e);
+ if (dirtySink != null) {
+ DirtyData.Builder<String> builder = DirtyData.builder();
+ try {
+ builder.setData(new String(value))
+ .setDirtyType(dirtyType)
+ .setLabels(dirtyOptions.getLabels())
+ .setLogTag(dirtyOptions.getLogTag())
+ .setIdentifier(dirtyOptions.getIdentifier());
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new IOException(ex);
+ }
+ LOG.warn("Dirty sink failed", ex);
+ }
+ }
+ }
+ }
+ }
+
private void outputMetrics(ConsumerRecord<byte[], byte[]> record) {
long dataSize = record.value() == null ? 0L : record.value().length;
if (metricData != null) {
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index 1d5932968..7525ffae1 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -43,6 +43,8 @@ import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.kafka.FlinkKafkaConsumer;
import
org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -51,7 +53,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@@ -178,6 +179,9 @@ public class KafkaDynamicSource
protected final String auditHostAndPorts;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<String> dirtySink;
+
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Logger LOG =
LoggerFactory.getLogger(KafkaDynamicSource.class);
@@ -197,7 +201,9 @@ public class KafkaDynamicSource
long startupTimestampMillis,
boolean upsertMode,
final String inlongMetric,
- final String auditHostAndPorts) {
+ final String auditHostAndPorts,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<String> dirtySink) {
// Format attributes
this.physicalDataType =
Preconditions.checkNotNull(
@@ -232,6 +238,8 @@ public class KafkaDynamicSource
this.upsertMode = upsertMode;
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
@Override
@@ -322,7 +330,11 @@ public class KafkaDynamicSource
startupMode,
specificStartupOffsets,
startupTimestampMillis,
- upsertMode, inlongMetric, auditHostAndPorts);
+ upsertMode,
+ inlongMetric,
+ auditHostAndPorts,
+ dirtyOptions,
+ dirtySink);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
@@ -427,7 +439,9 @@ public class KafkaDynamicSource
hasMetadata,
metadataConverters,
producedTypeInfo,
- upsertMode);
+ upsertMode,
+ dirtyOptions,
+ dirtySink);
final FlinkKafkaConsumer<RowData> kafkaConsumer;
if (topics != null) {
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 5f9debe1d..b2e563193 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -51,6 +51,9 @@ import
org.apache.flink.table.formats.raw.RawFormatSerializationSchema;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.kafka.KafkaDynamicSink;
import org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner;
@@ -95,6 +98,7 @@ import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.get
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
@@ -322,7 +326,7 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat =
getValueDecodingFormat(helper);
- helper.validateExcept(PROPERTIES_PREFIX);
+ helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX);
validateTableSourceOptions(tableOptions);
@@ -354,7 +358,9 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
final String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
final String auditHostAndPorts =
tableOptions.getOptional(INLONG_AUDIT).orElse(null);
-
+ // Build the dirty data side-output
+ final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(tableOptions);
+ final DirtySink<String> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
return createKafkaTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
@@ -369,7 +375,9 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
startupOptions.specificOffsets,
startupOptions.startupTimestampMillis,
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ dirtyOptions,
+ dirtySink);
}
@Override
@@ -387,7 +395,7 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
getValueEncodingFormat(helper);
final String sinkMultipleFormat = getSinkMultipleFormat(helper);
- helper.validateExcept(PROPERTIES_PREFIX);
+ helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX);
validateSinkPartitioner(tableOptions);
validateSinkSemantic(tableOptions);
@@ -411,7 +419,9 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
final String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
final String auditHostAndPorts =
tableOptions.getOptional(INLONG_AUDIT).orElse(null);
-
+ // Build the dirty data side-output
+ final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(tableOptions);
+ final DirtySink<Object> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
return createKafkaTableSink(
physicalDataType,
keyEncodingFormat.orElse(null),
@@ -428,13 +438,14 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
inlongMetric,
auditHostAndPorts,
sinkMultipleFormat,
- tableOptions.getOptional(TOPIC_PATTERN).orElse(null));
+ tableOptions.getOptional(TOPIC_PATTERN).orElse(null),
+ dirtyOptions,
+ dirtySink);
}
private void validateSinkMultipleFormatAndPhysicalDataType(DataType
physicalDataType,
EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat,
String sinkMultipleFormat) {
- if (valueEncodingFormat instanceof RawFormatSerializationSchema
- && StringUtils.isNotBlank(sinkMultipleFormat)) {
+ if (multipleSink(valueEncodingFormat, sinkMultipleFormat)) {
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
Set<String> supportFormats =
DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
if (!supportFormats.contains(sinkMultipleFormat)) {
@@ -451,6 +462,12 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
}
}
+ private boolean multipleSink(EncodingFormat<SerializationSchema<RowData>>
valueEncodingFormat,
+ String sinkMultipleFormat) {
+ return valueEncodingFormat instanceof RawFormatSerializationSchema
+ && StringUtils.isNotBlank(sinkMultipleFormat);
+ }
+
//
--------------------------------------------------------------------------------------------
protected KafkaDynamicSource createKafkaTableSource(
@@ -467,7 +484,9 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
String inlongMetric,
- String auditHostAndPorts) {
+ String auditHostAndPorts,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<String> dirtySink) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
@@ -483,7 +502,9 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
startupTimestampMillis,
false,
inlongMetric,
- auditHostAndPorts);
+ auditHostAndPorts,
+ dirtyOptions,
+ dirtySink);
}
protected KafkaDynamicSink createKafkaTableSink(
@@ -502,7 +523,9 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
String inlongMetric,
String auditHostAndPorts,
@Nullable String sinkMultipleFormat,
- @Nullable String topicPattern) {
+ @Nullable String topicPattern,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
return new KafkaDynamicSink(
physicalDataType,
physicalDataType,
@@ -522,6 +545,8 @@ public class KafkaDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
inlongMetric,
auditHostAndPorts,
sinkMultipleFormat,
- topicPattern);
+ topicPattern,
+ dirtyOptions,
+ dirtySink);
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index b53565357..a5da814d0 100644
---
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -45,6 +45,9 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
import org.apache.inlong.sort.kafka.KafkaDynamicSink;
import java.time.Duration;
@@ -67,6 +70,7 @@ import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.aut
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
import static
org.apache.inlong.sort.kafka.table.KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG;
@@ -200,7 +204,7 @@ public class UpsertKafkaDynamicTableFactory
helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
// Validate the option data type.
- helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+ helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX, DIRTY_PREFIX);
TableSchema schema = context.getCatalogTable().getSchema();
validateSource(tableOptions, keyDecodingFormat, valueDecodingFormat,
schema);
@@ -210,7 +214,9 @@ public class UpsertKafkaDynamicTableFactory
Properties properties =
getKafkaProperties(context.getCatalogTable().getOptions());
// always use earliest to keep data integrity
StartupMode earliest = StartupMode.EARLIEST;
-
+ // Build the dirty data side-output
+ final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(tableOptions);
+ final DirtySink<String> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
return new KafkaDynamicSource(
schema.toPhysicalRowDataType(),
keyDecodingFormat,
@@ -224,7 +230,11 @@ public class UpsertKafkaDynamicTableFactory
earliest,
Collections.emptyMap(),
0,
- true, null, null);
+ true,
+ null,
+ null,
+ dirtyOptions,
+ dirtySink);
}
@Override
@@ -241,7 +251,7 @@ public class UpsertKafkaDynamicTableFactory
helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT);
// Validate the option data type.
- helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+ helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX, DIRTY_PREFIX);
TableSchema schema = context.getCatalogTable().getSchema();
validateSink(tableOptions, keyEncodingFormat, valueEncodingFormat,
schema);
@@ -258,6 +268,9 @@ public class UpsertKafkaDynamicTableFactory
new SinkBufferFlushMode(batchSize, batchInterval.toMillis());
String inlongMetric =
tableOptions.getOptional(INLONG_METRIC).orElse(null);
final String auditHostAndPorts =
tableOptions.getOptional(INLONG_AUDIT).orElse(null);
+ // Build the dirty data side-output
+ final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(tableOptions);
+ final DirtySink<Object> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
// use {@link
org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
// it will use hash partition if key is set else in round-robin
behaviour.
@@ -280,7 +293,9 @@ public class UpsertKafkaDynamicTableFactory
inlongMetric,
auditHostAndPorts,
null,
- null);
+ null,
+ dirtyOptions,
+ dirtySink);
}
private Tuple2<int[], int[]> createKeyValueProjections(CatalogTable
catalogTable) {
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
index 18b39eb57..52e3d6f6b 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
@@ -22,6 +22,8 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.VarBinaryFormatInfo;
import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
@@ -45,6 +47,7 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -109,6 +112,97 @@ public class KafkaLoadSqlParseTest extends
AbstractTestBase {
"raw-hash", pattern);
}
+ private KafkaExtractNode buildDirtyKafkaExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(
+ new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()));
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student");
+ JsonFormat jsonFormat = new JsonFormat();
+ jsonFormat.setIgnoreParseErrors(false);
+ return new KafkaExtractNode("1", "kafka_input", fields,
+ null, properties, "topic_dirty_input", "localhost:9092",
+ jsonFormat, KafkaScanStartupMode.EARLIEST_OFFSET, null,
+ "test_group", null, null);
+ }
+
+ private KafkaLoadNode buildDirtyKafkaLoadNode() {
+ List<FieldInfo> fields = Arrays.asList(
+ new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()));
+ List<FieldRelation> relations = Arrays.asList(
+ new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())),
+ new FieldRelation(new FieldInfo("name", new
StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())));
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "s3");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student");
+ properties.put("dirty.side-output.s3.bucket", "s3-test-bucket");
+ properties.put("dirty.side-output.s3.endpoint", "s3.test.endpoint");
+ properties.put("dirty.side-output.s3.key", "dirty/test");
+ properties.put("dirty.side-output.s3.region", "region");
+ properties.put("dirty.side-output.s3.access-key-id", "access_key_id");
+ properties.put("dirty.side-output.s3.secret-key-id", "secret_key_id");
+ properties.put("dirty.identifier", "inlong-student-${SYSTEM_TIME}");
+ return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+ null, "topic_dirty_output", "localhost:9092", new JsonFormat(),
+ null, properties, null);
+ }
+
+ private KafkaExtractNode buildDirtyKafkaExtractNodeWithRawFormat() {
+ List<FieldInfo> fields = Collections.singletonList(new
FieldInfo("raw", new VarBinaryFormatInfo()));
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student");
+ return new KafkaExtractNode("1", "kafka_input", fields,
+ null, properties, "topic_dirty_input", "localhost:9092",
+ new RawFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null,
+ "test_group", null, null);
+ }
+
+ private KafkaLoadNode buildDirtyKafkaLoadNodeWithDynamicTopic() {
+ List<FieldInfo> fields = Collections.singletonList(new
FieldInfo("raw", new VarBinaryFormatInfo()));
+ List<FieldRelation> relations = Collections.singletonList(
+ new FieldRelation(new FieldInfo("raw", new StringFormatInfo()),
+ new FieldInfo("raw", new StringFormatInfo())));
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=${database}&table=${table}");
+ properties.put("dirty.identifier",
"${database}-${table}-${SYSTEM_TIME}");
+ properties.put("dirty.side-output.s3.bucket", "s3-test-bucket");
+ properties.put("dirty.side-output.s3.endpoint", "s3.test.endpoint");
+ properties.put("dirty.side-output.s3.key", "dirty/test");
+ properties.put("dirty.side-output.s3.region", "region");
+ properties.put("dirty.side-output.s3.access-key-id", "access_key_id");
+ properties.put("dirty.side-output.s3.secret-key-id", "secret_key_id");
+ return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+ null, "topic_dirty_output", "localhost:9092", new RawFormat(),
+ null, properties, null, new CanalJsonFormat(),
"topic_dirty_output",
+ null, null);
+ }
+
/**
* build node relation
*
@@ -233,4 +327,60 @@ public class KafkaLoadSqlParseTest extends
AbstractTestBase {
ParseResult result = parser.parse();
Assert.assertTrue(result.tryExecute());
}
+
+ /**
+ * Test dirty handle of kafka source and kafka sink
+ * In this part it uses 'log' side-output for kafka source and 's3'
side-output for kafka sink
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testKafkaDirtyHandleSqlParse() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildDirtyKafkaExtractNode();
+ Node outputNode = buildDirtyKafkaLoadNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+
+ /**
+ * Test dirty handle of kafka source and kafka sink with dynamic topic
+ * In this part it uses 'log' side-output for kafka source and 's3'
side-output for kafka sink
+ * @throws Exception The exception may be thrown when executing
+ */
+ @Test
+ public void testKafkaDirtyHandleWithDynamicTopicSqlParse() throws
Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ env.disableOperatorChaining();
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildDirtyKafkaExtractNodeWithRawFormat();
+ Node outputNode = buildDirtyKafkaLoadNodeWithDynamicTopic();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ ParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
}