This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8c7d31dceb [INLONG-11352][SDK] Add dirty data collection sdk (#11354)
8c7d31dceb is described below
commit 8c7d31dceb75b2db40e4eee6650b32b355cbfed2
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Oct 21 14:15:45 2024 +0800
[INLONG-11352][SDK] Add dirty data collection sdk (#11354)
Co-authored-by: AloysZhang <[email protected]>
Co-authored-by: Charles Zhang <[email protected]>
---
inlong-sdk/dirty-data-sdk/README.md | 53 +++++
inlong-sdk/dirty-data-sdk/pom.xml | 96 +++++++++
.../org/apache/inlong/sdk/dirtydata/Constants.java | 56 ++++++
.../org/apache/inlong/sdk/dirtydata/DirtyData.java | 149 ++++++++++++++
.../inlong/sdk/dirtydata/DirtyDataCollector.java | 219 +++++++++++++++++++++
.../apache/inlong/sdk/dirtydata/DirtyOptions.java | 93 +++++++++
.../org/apache/inlong/sdk/dirtydata/DirtySink.java | 57 ++++++
.../inlong/sdk/dirtydata/PatternReplaceUtils.java | 46 +++++
.../inlong/sdk/dirtydata/sink/Configure.java | 51 +++++
.../sdk/dirtydata/sink/InlongSdkDirtySink.java | 154 +++++++++++++++
.../sdk/dirtydata/sink/InlongSdkOptions.java | 51 +++++
.../inlong/sdk/dirtydata/sink/LabelUtils.java | 67 +++++++
inlong-sdk/pom.xml | 1 +
13 files changed, 1093 insertions(+)
diff --git a/inlong-sdk/dirty-data-sdk/README.md
b/inlong-sdk/dirty-data-sdk/README.md
new file mode 100644
index 0000000000..8c3b748f70
--- /dev/null
+++ b/inlong-sdk/dirty-data-sdk/README.md
@@ -0,0 +1,53 @@
+## Overview
+
+This SDK is used to collect dirty data and store it in a designated storage
location.
+
+## Features
+
+### Independent SDK
+
+Independent SDK, not dependent on platform specific libraries (such as Flink),
can be used by Agent,
+DataProxy, Sort modules.
+
+### Scalable multiple data storage options
+
+Dirty data can be stored in various different storage locations (currently
only supports sending to
+DataProxy).
+
+## Usage
+
+### Create DirtyDataCollector object
+
+```java
+ Map<String, String> configMap = new ConcurrentHashMap<>();
+ // Enable dirty data collection
+ configMap.put(DIRTY_COLLECT_ENABLE, "true");
+ // If ignore error messages during dirty data collection
+ configMap.put(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, "true");
+ // The storage where dirty data will be stored currently only supports
'inlong',
+ // which means sending the data to DataSroxy
+ configMap.put(DIRTY_SIDE_OUTPUT_CONNECTOR, "inlong");
+ // The labels of dirty side-output, format is 'key1=value1&key2=value2'
+ configMap.put(DIRTY_SIDE_OUTPUT_LABELS, "key1=value1&key2=value2");
+ // The log tag of dirty side-output, it supports variable replace like
'${variable}'.
+ configMap.put(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData");
+ Configure config = new Configure(configMap);
+
+ DirtyDataCollector collecter = new DirtyDataCollector();
+ collector.open(config);
+```
+
+### Collect dirty data
+
+```java
+ // In fact, the dirty data we encounter is often parsed incorrectly,
+ // so we use byte [] as the format for dirty data.
+ byte[] dirtyData =
"xxxxxxxxxyyyyyyyyyyyyyy".getBytes(StandardCharsets.UTF_8);
+ // Here, incorrect types can be marked, such as missing fields, type
errors, or unknown errors, etc.
+ String dirtyType = "Undefined";
+ // Details of errors can be passed here.
+ Throwable error = new Throwable();
+ collector.invoke(dirtyData, dirtyType, error);
+```
+
+ |
\ No newline at end of file
diff --git a/inlong-sdk/dirty-data-sdk/pom.xml
b/inlong-sdk/dirty-data-sdk/pom.xml
new file mode 100644
index 0000000000..853e873094
--- /dev/null
+++ b/inlong-sdk/dirty-data-sdk/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-sdk</artifactId>
+ <version>2.1.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>dirty-data-sdk</artifactId>
+ <name>Apache InLong - Dirty Data SDK</name>
+ <properties>
+ <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>inlong-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sdk-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>dataproxy-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>3.1.0</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.2.0</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${plugin.compile.version}</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${plugin.surefire.version}</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>3.0.2</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-install-plugin</artifactId>
+ <version>2.5.2</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>${plugin.deploy.version}</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.7.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>3.0.0</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+
+</project>
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
new file mode 100644
index 0000000000..933f81a67b
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+/**
+ * Connector base option constant
+ */
+public final class Constants {
+
+ public static final String DIRTY_COLLECT_ENABLE = "dirty.collect.enable";
+
+ public static final String DIRTY_SIDE_OUTPUT_CONNECTOR =
"dirty.side-output.connector";
+
+ public static final String DIRTY_SIDE_OUTPUT_IGNORE_ERRORS =
"dirty.side-output.ignore-errors";
+
+ /**
+ * The labels of dirty side-output, format is 'key1=value1&key2=value2'
+ * it supports variable replace like '${variable}'
+ * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE]
+ * are currently supported,
+ * and the support of other variables is determined by the connector.
+ */
+ public static final String DIRTY_SIDE_OUTPUT_LABELS =
"dirty.side-output.labels";
+
+ /**
+ * The log tag of dirty side-output, it supports variable replace like
'${variable}'.
+ * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE]
are currently supported,
+ * and the support of other variables is determined by the connector.
+ */
+ public static final String DIRTY_SIDE_OUTPUT_LOG_TAG =
"dirty.side-output.log-tag";
+
+ /**
+ * It is used for 'inlong.metric.labels' or 'sink.dirty.labels'
+ */
+ public static final String DELIMITER = "&";
+
+ /**
+ * The delimiter of key and value, it is used for 'inlong.metric.labels'
or 'sink.dirty.labels'
+ */
+ public static final String KEY_VALUE_DELIMITER = "=";
+}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
new file mode 100644
index 0000000000..93caf8b57e
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Dirty data base class, it is a wrapper of dirty data
+ */
+public class DirtyData {
+
+ private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
+
+ private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
+ private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
+
+ private static final DateTimeFormatter DATE_TIME_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ /**
+ * The identifier of dirty data, it will be used for filename generation
of file dirty sink,
+ * topic generation of mq dirty sink, tablename generation of database,
etc,
+ * and it supports variable replace like '${variable}'.
+ * There are several system
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,
+ * and the support of other variables is determined by the connector.
+ */
+ private final String identifier;
+ /**
+ * The labels of the dirty data, it will be written to store system of
dirty
+ */
+ private final String labels;
+ /**
+ * The log tag of dirty data, it is only used to format log as follows:
+ * [${logTag}] ${labels} ${data}
+ */
+ private final String logTag;
+ /**
+ * Dirty type
+ */
+ private final String dirtyType;
+ /**
+ * Dirty describe message, it is the cause of dirty data
+ */
+ private final String dirtyMessage;
+ /**
+ * The real dirty data
+ */
+ private final byte[] data;
+
+ public DirtyData(byte[] data, String identifier, String labels,
+ String logTag, String dirtyType, String dirtyMessage) {
+ this.data = data;
+ this.dirtyType = dirtyType;
+ this.dirtyMessage = dirtyMessage;
+ Map<String, String> paramMap = genParamMap();
+ this.labels = PatternReplaceUtils.replace(labels, paramMap);
+ this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
+ this.identifier = PatternReplaceUtils.replace(identifier, paramMap);
+
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ private Map<String, String> genParamMap() {
+ Map<String, String> paramMap = new HashMap<>();
+ paramMap.put(SYSTEM_TIME_KEY,
DATE_TIME_FORMAT.format(LocalDateTime.now()));
+ paramMap.put(DIRTY_TYPE_KEY, dirtyType);
+ paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
+ return paramMap;
+ }
+
+ public String getLabels() {
+ return labels;
+ }
+
+ public String getLogTag() {
+ return logTag;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public String getDirtyType() {
+ return dirtyType;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public static class Builder {
+
+ private String identifier;
+ private String labels;
+ private String logTag;
+ private String dirtyType = "UNDEFINED";
+ private String dirtyMessage;
+ private byte[] data;
+
+ public Builder setDirtyType(String dirtyType) {
+ this.dirtyType = dirtyType;
+ return this;
+ }
+
+ public Builder setLabels(String labels) {
+ this.labels = labels;
+ return this;
+ }
+
+ public Builder setData(byte[] data) {
+ this.data = data;
+ return this;
+ }
+
+ public Builder setLogTag(String logTag) {
+ this.logTag = logTag;
+ return this;
+ }
+
+ public Builder setDirtyMessage(String dirtyMessage) {
+ this.dirtyMessage = dirtyMessage;
+ return this;
+ }
+
+ public DirtyData build() {
+ return new DirtyData(data, identifier, labels, logTag, dirtyType,
dirtyMessage);
+ }
+ }
+}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
new file mode 100644
index 0000000000..bd8afffe62
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import org.apache.inlong.sdk.dirtydata.DirtyData.Builder;
+import org.apache.inlong.sdk.dirtydata.sink.Configure;
+import org.apache.inlong.sdk.dirtydata.sink.InlongSdkDirtySink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Dirty sink helper, it helps dirty data sink for {@link DirtySink}
+ */
+public class DirtyDataCollector implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DirtyDataCollector.class);
+ static final Pattern REGEX_PATTERN =
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
+ private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
+ private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
+ private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
+ private static final DateTimeFormatter DATE_TIME_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ private DirtyOptions dirtyOptions;
+ private DirtySink dirtySink;
+ private Configure config;
+
+ public DirtyDataCollector() {
+ }
+
+ /**
+ * Open for dirty sink
+ *
+ * @param configuration The configuration that is used for dirty sink
+ */
+ public void open(Configure configuration) {
+ config = configuration;
+ if (dirtyOptions == null) {
+ dirtyOptions = DirtyOptions.fromConfig(configuration);
+ }
+ dirtyOptions.validate();
+ if (!dirtyOptions.isEnableDirtyCollect()) {
+ return;
+ }
+ dirtySink = createDirtySink(dirtyOptions.getSinkType());
+ try {
+ dirtySink.open(configuration);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ private DirtySink createDirtySink(String sinkType) {
+ DirtySink sink;
+ try {
+ switch (sinkType) {
+ case "inlong": {
+ sink = new InlongSdkDirtySink();
+ sink.open(config);
+ return sink;
+ }
+ default: {
+ LOGGER.error("invalid dirty sink type {}", sinkType);
+ return null;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("create dirty sink error", e);
+ }
+ return null;
+ }
+
+ /**
+ * Dirty data sink
+ *
+ * @param data The dirty data
+ * @param dirtyType The dirty type
+ * @param e The cause of dirty data
+ */
+ public void invoke(byte[] data, String dirtyType, Throwable e) {
+ invoke(data, dirtyType, dirtyOptions.getLabels(),
dirtyOptions.getLogTag(), e);
+ }
+
+ /**
+ * Dirty data sink
+ *
+ * @param data The dirty data
+ * @param dirtyType The dirty type
+ * @param label The dirty label
+ * @param logTag The dirty logTag
+ * @param e The cause of dirty data
+ */
+ public void invoke(byte[] data, String dirtyType, String label, String
logTag, Throwable e) {
+ if (!dirtyOptions.isEnableDirtyCollect()) {
+ return;
+ }
+ if (dirtySink != null) {
+ Builder builder = DirtyData.builder();
+ try {
+ builder.setData(data)
+ .setDirtyType(dirtyType)
+ .setLabels(label)
+ .setLogTag(logTag)
+ .setDirtyMessage(e.getMessage());
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.isIgnoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOGGER.warn("Dirty sink failed", ex);
+ }
+ }
+ }
+
+ /**
+ * replace ${SYSTEM_TIME} with real time
+ *
+ * @param pattern
+ * @return
+ */
+ public static String regexReplace(String pattern, String dirtyType, String
dirtyMessage) {
+ if (pattern == null) {
+ return null;
+ }
+
+ Map<String, String> paramMap = new HashMap<>(6);
+ paramMap.put(SYSTEM_TIME_KEY,
DATE_TIME_FORMAT.format(LocalDateTime.now()));
+ paramMap.put(DIRTY_TYPE_KEY, dirtyType);
+ paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
+
+ Matcher matcher = REGEX_PATTERN.matcher(pattern);
+ StringBuffer sb = new StringBuffer();
+ while (matcher.find()) {
+ String keyText = matcher.group(1);
+ String replacement = paramMap.get(keyText);
+ if (replacement == null) {
+ continue;
+ }
+ matcher.appendReplacement(sb, replacement);
+ }
+ matcher.appendTail(sb);
+ return sb.toString();
+ }
+
+ /**
+ * replace ${database} ${table} etc. Used in cases where
jsonDynamicFormat.parse() is not usable.
+ */
+ public static String regexReplace(String pattern, String dirtyType, String
dirtyMessage, String database,
+ String table, String schema) {
+ if (pattern == null) {
+ return null;
+ }
+
+ Map<String, String> paramMap = new HashMap<>(6);
+ paramMap.put(SYSTEM_TIME_KEY,
DATE_TIME_FORMAT.format(LocalDateTime.now()));
+ paramMap.put(DIRTY_TYPE_KEY, dirtyType);
+ paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
+ paramMap.put("source.database", database);
+ paramMap.put("database", database);
+ paramMap.put("source.table", table);
+ paramMap.put("table", table);
+ if (schema != null) {
+ paramMap.put("source.schema", schema);
+ paramMap.put("schema", schema);
+ }
+
+ Matcher matcher = REGEX_PATTERN.matcher(pattern);
+ StringBuffer sb = new StringBuffer();
+ while (matcher.find()) {
+ String keyText = matcher.group(1);
+ String replacement = paramMap.get(keyText);
+ if (replacement == null) {
+ continue;
+ }
+ matcher.appendReplacement(sb, replacement);
+ }
+ matcher.appendTail(sb);
+ return sb.toString();
+ }
+
+ public void setDirtyOptions(DirtyOptions dirtyOptions) {
+ this.dirtyOptions = dirtyOptions;
+ }
+
+ public DirtyOptions getDirtyOptions() {
+ return dirtyOptions;
+ }
+
+ @Nullable
+ public DirtySink getDirtySink() {
+ return dirtySink;
+ }
+}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
new file mode 100644
index 0000000000..d70127adf8
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import org.apache.inlong.sdk.dirtydata.sink.Configure;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+
+import java.io.Serializable;
+
+import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_COLLECT_ENABLE;
+import static
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_CONNECTOR;
+import static
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LABELS;
+import static
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG;
+
+/**
+ * Dirty common options
+ */
+@Data
+@Builder
+@Getter
+public class DirtyOptions implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final String DEFAULT_FORMAT = "csv";
+ private static final String DEFAULT_CSV_FIELD_DELIMITER = ",";
+ private final boolean enableDirtyCollect;
+ private final boolean ignoreSideOutputErrors;
+ private final String sinkType;
+ private final String labels;
+ private final String logTag;
+ private final String format;
+ private final String csvFieldDelimiter;
+
+ private DirtyOptions(boolean enableDirtyCollect, boolean
ignoreSideOutputErrors,
+ String sinkType, String labels, String logTag, String format,
String csvFieldDelimiter) {
+ this.enableDirtyCollect = enableDirtyCollect;
+ this.ignoreSideOutputErrors = ignoreSideOutputErrors;
+ this.sinkType = sinkType;
+ this.labels = labels;
+ this.logTag = logTag;
+ this.format = format;
+ this.csvFieldDelimiter = csvFieldDelimiter;
+ }
+
+ /**
+ * Get dirty options from {@link Configure}
+ *
+ * @param config The config
+ * @return Dirty options
+ */
+ public static DirtyOptions fromConfig(Configure config) {
+ boolean enableDirtyCollect = config.getBoolean(DIRTY_COLLECT_ENABLE,
false);
+ boolean ignoreSinkError =
config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, true);
+ String dirtyConnector = config.get(DIRTY_SIDE_OUTPUT_CONNECTOR, null);
+ String labels = config.get(DIRTY_SIDE_OUTPUT_LABELS, null);
+ String logTag = config.get(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData");
+ String format = DEFAULT_FORMAT;
+ String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER;
+ return new DirtyOptions(enableDirtyCollect, ignoreSinkError,
+ dirtyConnector, labels, logTag, format, csvFieldDelimiter);
+ }
+
+ public void validate() {
+ if (!enableDirtyCollect) {
+ return;
+ }
+ if (sinkType == null || sinkType.trim().length() == 0) {
+ throw new RuntimeException(
+ "The option 'dirty.side-output.connector' is not allowed
to be empty "
+ + "when the option 'dirty.ignore' is 'true' "
+ + "and the option 'dirty.side-output.enable' is
'true'");
+ }
+ }
+}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
new file mode 100644
index 0000000000..68d8cc9110
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import org.apache.inlong.sdk.dirtydata.sink.Configure;
+
+import java.io.Serializable;
+
+/**
+ * The dirty sink base inteface
+ *
+ */
+public interface DirtySink extends Serializable {
+
+ /**
+ * Open for dirty sink
+ *
+ * @param configuration The configuration that is used for dirty sink
+ * @throws Exception The exception may be thrown when executing
+ */
+ default void open(Configure configuration) throws Exception {
+
+ }
+
+ /**
+ * Invoke that is used to sink dirty data
+ *
+ * @param dirtyData The dirty data that will be written
+ * @throws Exception The exception may be thrown when executing
+ */
+ void invoke(DirtyData dirtyData) throws Exception;
+
+ /**
+ * Close for dirty sink
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ default void close() throws Exception {
+
+ }
+
+}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
new file mode 100644
index 0000000000..20f70c5205
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The pattern replace utils
+ */
+public final class PatternReplaceUtils {
+
+ private static final Pattern REGEX_PATTERN =
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}",
+ Pattern.CASE_INSENSITIVE);
+
+ public static String replace(String pattern, Map<String, String> params) {
+ if (pattern == null) {
+ return null;
+ }
+ Matcher matcher = REGEX_PATTERN.matcher(pattern);
+ StringBuffer sb = new StringBuffer();
+ while (matcher.find()) {
+ String keyText = matcher.group(1);
+ String replacement = params.getOrDefault(keyText, keyText);
+ matcher.appendReplacement(sb, replacement);
+ }
+ matcher.appendTail(sb);
+ return sb.toString();
+ }
+}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
new file mode 100644
index 0000000000..e2031915c6
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata.sink;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Configure {
+
+ private Map<String, String> data;
+
+ public Configure(Map<String, String> data) {
+ this.data = new ConcurrentHashMap<>();
+ this.data.putAll(data);
+ }
+
+ public String get(String key, String defaultValue) {
+ String value = data.get(key);
+ if (value != null) {
+ return value;
+ }
+ return defaultValue;
+ }
+
+ public String get(String key) {
+ return data.get(key);
+ }
+
+ public Boolean getBoolean(String key, boolean defaultValue) {
+ String value = data.get(key);
+ if (value != null) {
+ return Boolean.valueOf(value);
+ }
+ return defaultValue;
+ }
+}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
new file mode 100644
index 0000000000..bef0fc3110
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata.sink;
+
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.MessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dirtydata.DirtyData;
+import org.apache.inlong.sdk.dirtydata.DirtySink;
+
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.StringJoiner;
+
+import static
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+
+@Slf4j
+public class InlongSdkDirtySink implements DirtySink {
+
+ // The inlong manager addr to init inlong sdk
+ private static final String DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR =
+ "dirty.side-output.inlong-sdk.inlong-manager-addr";
+ // The inlong manager auth id to init inlong sdk
+ private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID =
"dirty.side-output.inlong-sdk.inlong-auth-id";
+ // The inlong manager auth id to init inlong sdk
+ private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY =
"dirty.side-output.inlong-sdk.inlong-auth-key";
+ // The inlong group id of dirty sink
+ private static final String DIRTY_SIDE_OUTPUT_INLONG_GROUP =
"dirty.side-output.inlong-sdk.inlong-group-id";
+ // The inlong stream id of dirty sink
+ private static final String DIRTY_SIDE_OUTPUT_INLONG_STREAM =
"dirty.side-output.inlong-sdk.inlong-stream-id";
+
+ private InlongSdkOptions options;
+ private String inlongGroupId;
+ private String inlongStreamId;
+ private final SendMessageCallback callback;
+
+ private transient DateTimeFormatter dateTimeFormatter;
+ private transient MessageSender sender;
+
+ public InlongSdkDirtySink() {
+ this.callback = new LogCallBack();
+ }
+
+ @Override
+ public void open(Configure configuration) throws Exception {
+ options = getOptions(configuration);
+ this.inlongGroupId = options.getInlongGroupId();
+ this.inlongStreamId = options.getInlongStreamId();
+ dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ // init sender
+ ProxyClientConfig proxyClientConfig =
+ new ProxyClientConfig(options.getInlongManagerAddr(),
options.getInlongGroupId(),
+ options.getInlongManagerAuthId(),
options.getInlongManagerAuthKey());
+ sender =
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
+ }
+
+ @Override
+ public void invoke(DirtyData dirtyData) {
+ try {
+ Map<String, String> labelMap =
LabelUtils.parseLabels(dirtyData.getLabels());
+ String groupId =
Preconditions.checkNotNull(labelMap.get("groupId"));
+ String streamId =
Preconditions.checkNotNull(labelMap.get("streamId"));
+
+ String message = join(groupId, streamId,
+ dirtyData.getDirtyType(), dirtyData.getLabels(),
+ new String(dirtyData.getData(), StandardCharsets.UTF_8));
+ sender.asyncSendMessage(inlongGroupId, inlongStreamId,
message.getBytes(), callback);
+ } catch (Throwable t) {
+ log.error("failed to send dirty message to inlong sdk", t);
+ }
+ }
+
+ private InlongSdkOptions getOptions(Configure config) {
+ return InlongSdkOptions.builder()
+
.inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR))
+ .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
+ .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
+
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
+
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
+
.ignoreSideOutputErrors(config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS,
true))
+ .enableDirtyLog(true)
+ .build();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (sender != null) {
+ sender.close();
+ }
+ }
+
+ private String join(
+ String inlongGroup,
+ String inlongStream,
+ String type,
+ String label,
+ String formattedData) {
+
+ String now = LocalDateTime.now().format(dateTimeFormatter);
+
+ StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter());
+ return joiner.add(inlongGroup + "." + inlongStream)
+ .add(now)
+ .add(type)
+ .add(label)
+ .add(formattedData).toString();
+ }
+
+ class LogCallBack implements SendMessageCallback {
+
+ @Override
+ public void onMessageAck(SendResult result) {
+ if (result == SendResult.OK) {
+ return;
+ }
+ log.error("failed to send inlong dirty message, response={}",
result);
+
+ if (!options.isIgnoreSideOutputErrors()) {
+ throw new RuntimeException("writing dirty message to inlong
sdk failed, response=" + result);
+ }
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ log.error("failed to send inlong dirty message", e);
+
+ if (!options.isIgnoreSideOutputErrors()) {
+ throw new RuntimeException("writing dirty message to inlong
sdk failed", e);
+ }
+ }
+ }
+}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
new file mode 100644
index 0000000000..b657a97f20
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata.sink;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+@Getter
+public class InlongSdkOptions implements Serializable {
+
+ private static final String DEFAULT_FORMAT = "csv";
+
+ private static final String DEFAULT_CSV_FIELD_DELIMITER = ",";
+ private static final String DEFAULT_CSV_LINE_DELIMITER = "\n";
+
+ private static final String DEFAULT_KV_FIELD_DELIMITER = "&";
+ private static final String DEFAULT_KV_ENTRY_DELIMITER = "=";
+
+ private String inlongGroupId;
+ private String inlongStreamId;
+ private String inlongManagerAddr;
+ private String inlongManagerAuthKey;
+ private String inlongManagerAuthId;
+ private String format = DEFAULT_FORMAT;
+ private boolean ignoreSideOutputErrors;
+ private boolean enableDirtyLog;
+ private String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER;
+ private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER;
+ private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER;
+ private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER;
+}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
new file mode 100644
index 0000000000..2ce58b134d
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata.sink;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.inlong.sdk.dirtydata.Constants.DELIMITER;
+import static org.apache.inlong.sdk.dirtydata.Constants.KEY_VALUE_DELIMITER;
+
+/**
+ * The label utils class, is used to parse the labels to a label map
+ */
+public final class LabelUtils {
+
+ private LabelUtils() {
+ }
+
+ /**
+ * Parse the labels to label map
+ *
+ * @param labels The labels format by 'key1=value1&key2=value2...'
+ * @return The label map of labels
+ */
+ public static Map<String, String> parseLabels(String labels) {
+ return parseLabels(labels, new LinkedHashMap<>());
+ }
+
+ /**
+ * Parse the labels to label map
+ *
+ * @param labels The labels format by 'key1=value1&key2=value2...'
+ * @return The label map of labels
+ */
+ public static Map<String, String> parseLabels(String labels, Map<String,
String> labelMap) {
+ if (labelMap == null) {
+ labelMap = new LinkedHashMap<>();
+ }
+ if (labels == null || labels.length() == 0) {
+ return labelMap;
+ }
+ String[] labelArray = labels.split(DELIMITER);
+ for (String label : labelArray) {
+ int index = label.indexOf(KEY_VALUE_DELIMITER);
+ if (index < 1 || index == label.length() - 1) {
+ throw new IllegalArgumentException("The format of labels must
be like 'key1=value1&key2=value2...'");
+ }
+ labelMap.put(label.substring(0, index), label.substring(index +
1));
+ }
+ return labelMap;
+ }
+}
diff --git a/inlong-sdk/pom.xml b/inlong-sdk/pom.xml
index 21b99e24eb..27025d06f5 100644
--- a/inlong-sdk/pom.xml
+++ b/inlong-sdk/pom.xml
@@ -34,6 +34,7 @@
<module>sort-sdk</module>
<module>dataproxy-sdk</module>
<module>transform-sdk</module>
+ <module>dirty-data-sdk</module>
</modules>
<properties>