This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8d6bd8c08 [INLONG-6797][Sort] Supports dirty data side-output for
filesystem sink (#6799)
8d6bd8c08 is described below
commit 8d6bd8c0825c7f1f571419bb05c68f982965cd59
Author: yunqingmoswu <[email protected]>
AuthorDate: Fri Dec 9 14:11:43 2022 +0800
[INLONG-6797][Sort] Supports dirty data side-output for filesystem sink
(#6799)
---
.../sort/filesystem/FileSystemTableFactory.java | 12 ++++-
.../sort/filesystem/FileSystemTableSink.java | 27 +++++++---
.../filesystem/stream/AbstractStreamingWriter.java | 58 ++++++++++++++++++----
.../filesystem/stream/StreamingFileWriter.java | 9 +++-
.../sort/filesystem/stream/StreamingSink.java | 17 +++++--
.../stream/compact/CompactFileWriter.java | 10 +++-
.../inlong/sort/parser/FlinkSqlParserTest.java | 11 +++-
7 files changed, 115 insertions(+), 29 deletions(-)
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
index 58c585ebc..f27d55ec7 100644
---
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableFactory.java
@@ -39,6 +39,9 @@ import
org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.FileSystemTableSource;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
import java.util.HashSet;
import java.util.LinkedList;
@@ -48,6 +51,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static java.time.ZoneId.SHORT_IDS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG;
import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
@@ -86,13 +90,17 @@ public class FileSystemTableFactory implements
DynamicTableSourceFactory, Dynami
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
validate(helper);
+ final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(helper.getOptions());
+ final DirtySink<Object> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
return new FileSystemTableSink(
context,
discoverDecodingFormat(context, BulkReaderFormatFactory.class),
discoverDecodingFormat(context,
DeserializationFormatFactory.class),
discoverFormatFactory(context),
discoverEncodingFormat(context, BulkWriterFormatFactory.class),
- discoverEncodingFormat(context,
SerializationFormatFactory.class));
+ discoverEncodingFormat(context,
SerializationFormatFactory.class),
+ dirtyOptions,
+ dirtySink);
}
@Override
@@ -132,7 +140,7 @@ public class FileSystemTableFactory implements
DynamicTableSourceFactory, Dynami
private void validate(FactoryUtil.TableFactoryHelper helper) {
// Except format options, some formats like parquet and orc can not
list all supported
// options.
- helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) +
".");
+ helper.validateExcept(helper.getOptions().get(FactoryUtil.FORMAT) +
".", DIRTY_PREFIX);
// validate time zone of watermark
String watermarkTimeZone =
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
index 32f35b0e1..6f344098a 100644
---
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/FileSystemTableSink.java
@@ -77,6 +77,8 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.filesystem.stream.StreamingSink;
import javax.annotation.Nullable;
@@ -129,10 +131,12 @@ public class FileSystemTableSink extends
AbstractFileSystemTable
private LinkedHashMap<String, String> staticPartitions = new
LinkedHashMap<>();
@Nullable
- private Integer configuredParallelism;
+ private final Integer configuredParallelism;
- private String inlongMetric;
- private String inlongAudit;
+ private final String inlongMetric;
+ private final String inlongAudit;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<Object> dirtySink;
FileSystemTableSink(
DynamicTableFactory.Context context,
@@ -140,7 +144,9 @@ public class FileSystemTableSink extends
AbstractFileSystemTable
@Nullable DecodingFormat<DeserializationSchema<RowData>>
deserializationFormat,
@Nullable FileSystemFormatFactory formatFactory,
@Nullable EncodingFormat<BulkWriter.Factory<RowData>>
bulkWriterFormat,
- @Nullable EncodingFormat<SerializationSchema<RowData>>
serializationFormat) {
+ @Nullable EncodingFormat<SerializationSchema<RowData>>
serializationFormat,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
super(context);
this.bulkReaderFormat = bulkReaderFormat;
this.deserializationFormat = deserializationFormat;
@@ -159,6 +165,8 @@ public class FileSystemTableSink extends
AbstractFileSystemTable
this.configuredParallelism =
tableOptions.get(FileSystemOptions.SINK_PARALLELISM);
this.inlongMetric = tableOptions.get(INLONG_METRIC);
this.inlongAudit = tableOptions.get(INLONG_AUDIT);
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
}
@Override
@@ -283,11 +291,14 @@ public class FileSystemTableSink extends
AbstractFileSystemTable
compactionSize,
parallelism,
inlongMetric,
- inlongAudit);
+ inlongAudit,
+ dirtyOptions,
+ dirtySink);
} else {
writerStream =
StreamingSink.writer(
- dataStream, bucketCheckInterval, bucketsBuilder,
parallelism, inlongMetric, inlongAudit);
+ dataStream, bucketCheckInterval, bucketsBuilder,
parallelism,
+ inlongMetric, inlongAudit, dirtyOptions,
dirtySink);
}
return StreamingSink.sink(
@@ -543,7 +554,9 @@ public class FileSystemTableSink extends
AbstractFileSystemTable
deserializationFormat,
formatFactory,
bulkWriterFormat,
- serializationFormat);
+ serializationFormat,
+ dirtyOptions,
+ dirtySink);
sink.overwrite = overwrite;
sink.dynamicGrouping = dynamicGrouping;
sink.staticPartitions = staticPartitions;
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index e5e74e62a..cc85113b8 100644
---
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -36,12 +37,20 @@ import
org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.inlong.sort.base.metric.MetricState;
import org.apache.inlong.sort.base.metric.SinkMetricData;
import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
@@ -61,6 +70,8 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractStreamingWriter.class);
+
// ------------------------ configuration fields --------------------------
private final long bucketCheckInterval;
@@ -69,6 +80,8 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
private final String inlongMetric;
private final String inlongAudit;
+ private final DirtyOptions dirtyOptions;
+ private @Nullable final DirtySink<Object> dirtySink;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
@@ -88,11 +101,15 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
public AbstractStreamingWriter(
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<IN, String, ? extends
StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder,
- String inlongMetric, String inlongAudit) {
+ String inlongMetric, String inlongAudit,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
this.bucketCheckInterval = bucketCheckInterval;
this.bucketsBuilder = bucketsBuilder;
this.inlongMetric = inlongMetric;
this.inlongAudit = inlongAudit;
+ this.dirtyOptions = dirtyOptions;
+ this.dirtySink = dirtySink;
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -130,6 +147,9 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
if (metricOption != null) {
sinkMetricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
}
+ if (dirtySink != null) {
+ dirtySink.open(new Configuration());
+ }
}
/**
@@ -141,14 +161,11 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
if (sinkMetricData != null) {
sinkMetricData.invoke(rowSize, dataSize);
}
- } catch (Exception e) {
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(rowSize, dataSize);
- }
- LOG.error("fileSystem sink commitUpToCheckpoint.", e);
- } finally {
rowSize = 0L;
dataSize = 0L;
+ } catch (Exception e) {
+ LOG.error("fileSystem sink commitUpToCheckpoint.", e);
+ throw e;
}
}
@@ -221,11 +238,34 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
currentWatermark);
rowSize = rowSize + 1;
dataSize = dataSize +
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
} catch (Exception e) {
+ LOG.error("StreamingWriter write failed", e);
+ if (!dirtyOptions.ignoreDirty()) {
+ throw new RuntimeException(e);
+ }
if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(1L,
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length);
+ sinkMetricData.invokeDirty(1L,
+
element.getValue().toString().getBytes(StandardCharsets.UTF_8).length);
+ }
+ if (dirtySink != null) {
+ DirtyData.Builder<Object> builder = DirtyData.builder();
+ try {
+ builder.setData(element.getValue())
+ .setDirtyType(DirtyType.UNDEFINED)
+ .setLabels(dirtyOptions.getLabels())
+ .setLogTag(dirtyOptions.getLogTag())
+ .setDirtyMessage(e.getMessage())
+ .setIdentifier(dirtyOptions.getIdentifier());
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOGGER.warn("Dirty sink failed", ex);
+ }
}
- LOG.error("fileSystem sink processElement.", e);
}
}
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
index 1fb2e3742..97168a43c 100644
---
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingFileWriter.java
@@ -24,7 +24,10 @@ import org.apache.flink.runtime.state.StateSnapshotContext;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.NavigableMap;
@@ -45,8 +48,10 @@ public class StreamingFileWriter<IN> extends
AbstractStreamingWriter<IN, Partiti
public StreamingFileWriter(
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<IN, String, ? extends
StreamingFileSink.BucketsBuilder<IN, String, ?>> bucketsBuilder,
- String inlongMetric, String inlongAudit) {
- super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit);
+ String inlongMetric, String inlongAudit,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
+ super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit,
dirtyOptions, dirtySink);
}
@Override
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
index 5408e34be..2a26ed4cd 100644
---
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/StreamingSink.java
@@ -42,8 +42,11 @@ import
org.apache.flink.table.filesystem.stream.compact.CompactOperator;
import org.apache.flink.table.filesystem.stream.compact.CompactReader;
import org.apache.flink.table.filesystem.stream.compact.CompactWriter;
import org.apache.flink.util.function.SupplierWithException;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.filesystem.stream.compact.CompactFileWriter;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
@@ -66,9 +69,12 @@ public class StreamingSink {
DataStream<T> inputStream,
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<T, String, ? extends
StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder,
- int parallelism, String inlongMetric, String inlongAudit) {
+ int parallelism, String inlongMetric, String inlongAudit,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
StreamingFileWriter<T> fileWriter =
- new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder,
inlongMetric, inlongAudit);
+ new StreamingFileWriter<>(bucketCheckInterval, bucketsBuilder,
+ inlongMetric, inlongAudit, dirtyOptions, dirtySink);
return inputStream
.transform(
StreamingFileWriter.class.getSimpleName(),
@@ -89,9 +95,11 @@ public class StreamingSink {
Path path,
CompactReader.Factory<T> readFactory,
long targetFileSize,
- int parallelism, String inlongMetric, String inlongAudit) {
+ int parallelism, String inlongMetric, String inlongAudit,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
CompactFileWriter<T> writer = new
CompactFileWriter<>(bucketCheckInterval, bucketsBuilder, inlongMetric,
- inlongAudit);
+ inlongAudit, dirtyOptions, dirtySink);
SupplierWithException<FileSystem, IOException> fsSupplier =
(SupplierWithException<FileSystem, IOException> &
Serializable) () -> fsFactory.create(path.toUri());
@@ -118,7 +126,6 @@ public class StreamingSink {
CompactOperator<T> compacter =
new CompactOperator<>(fsSupplier, readFactory, writerFactory);
-
return coordinatorOp
.broadcast()
.transform(
diff --git
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
index 731aca18f..385894bc5 100644
---
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
+++
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/compact/CompactFileWriter.java
@@ -24,8 +24,12 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import
org.apache.flink.table.filesystem.stream.compact.CompactMessages.CoordinatorInput;
import
org.apache.flink.table.filesystem.stream.compact.CompactMessages.EndCheckpoint;
import
org.apache.flink.table.filesystem.stream.compact.CompactMessages.InputFile;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.filesystem.stream.AbstractStreamingWriter;
+import javax.annotation.Nullable;
+
/**
* Writer for emitting {@link InputFile} and {@link EndCheckpoint} to
downstream.
*/
@@ -38,8 +42,10 @@ public class CompactFileWriter<T>
public CompactFileWriter(
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<T, String, ? extends
StreamingFileSink.BucketsBuilder<T, String, ?>> bucketsBuilder,
- String inlongMetric, String inlongAudit) {
- super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit);
+ String inlongMetric, String inlongAudit,
+ DirtyOptions dirtyOptions,
+ @Nullable DirtySink<Object> dirtySink) {
+ super(bucketCheckInterval, bucketsBuilder, inlongMetric, inlongAudit,
dirtyOptions, dirtySink);
}
@Override
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
index c4c71791b..f1010dfd5 100644
---
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/FlinkSqlParserTest.java
@@ -162,9 +162,16 @@ public class FlinkSqlParserTest extends AbstractTestBase {
new FieldInfo("age", new IntFormatInfo())),
new FieldRelation(new FieldInfo("ts", new
TimestampFormatInfo()),
new FieldInfo("ts", new
TimestampFormatInfo())));
+ Map<String, String> properties = new LinkedHashMap<>();
+ properties.put("dirty.side-output.connector", "log");
+ properties.put("dirty.ignore", "true");
+ properties.put("dirty.side-output.enable", "true");
+ properties.put("dirty.side-output.format", "csv");
+ properties.put("dirty.side-output.labels",
+
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student");
return new FileSystemLoadNode(id, "hdfs_output", fields, relations,
- null, "hdfs://localhost:9000/file", "json",
- 1, null, null, null);
+ null, "hdfs://localhost:9000/inlong/student", "json",
+ 1, properties, null, null);
}
/**