This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c7d31dceb [INLONG-11352][SDK] Add dirty data collection sdk (#11354)
8c7d31dceb is described below

commit 8c7d31dceb75b2db40e4eee6650b32b355cbfed2
Author: justinwwhuang <[email protected]>
AuthorDate: Mon Oct 21 14:15:45 2024 +0800

    [INLONG-11352][SDK] Add dirty data collection sdk (#11354)
    
    Co-authored-by: AloysZhang <[email protected]>
    Co-authored-by: Charles Zhang <[email protected]>
---
 inlong-sdk/dirty-data-sdk/README.md                |  53 +++++
 inlong-sdk/dirty-data-sdk/pom.xml                  |  96 +++++++++
 .../org/apache/inlong/sdk/dirtydata/Constants.java |  56 ++++++
 .../org/apache/inlong/sdk/dirtydata/DirtyData.java | 149 ++++++++++++++
 .../inlong/sdk/dirtydata/DirtyDataCollector.java   | 219 +++++++++++++++++++++
 .../apache/inlong/sdk/dirtydata/DirtyOptions.java  |  93 +++++++++
 .../org/apache/inlong/sdk/dirtydata/DirtySink.java |  57 ++++++
 .../inlong/sdk/dirtydata/PatternReplaceUtils.java  |  46 +++++
 .../inlong/sdk/dirtydata/sink/Configure.java       |  51 +++++
 .../sdk/dirtydata/sink/InlongSdkDirtySink.java     | 154 +++++++++++++++
 .../sdk/dirtydata/sink/InlongSdkOptions.java       |  51 +++++
 .../inlong/sdk/dirtydata/sink/LabelUtils.java      |  67 +++++++
 inlong-sdk/pom.xml                                 |   1 +
 13 files changed, 1093 insertions(+)

diff --git a/inlong-sdk/dirty-data-sdk/README.md 
b/inlong-sdk/dirty-data-sdk/README.md
new file mode 100644
index 0000000000..8c3b748f70
--- /dev/null
+++ b/inlong-sdk/dirty-data-sdk/README.md
@@ -0,0 +1,53 @@
+## Overview
+
+This SDK is used to collect dirty data and store it in a designated storage 
location.
+
+## Features
+
+### Independent SDK
+
+Independent SDK, not dependent on platform specific libraries (such as Flink), 
can be used by Agent,
+DataProxy, Sort modules.
+
+### Scalable multiple data storage options
+
+Dirty data can be stored in various different storage locations (currently 
only supports sending to
+DataProxy).
+
+## Usage
+
+### Create DirtyDataCollector object
+
+```java
+        Map<String, String> configMap = new ConcurrentHashMap<>();
+        // Enable dirty data collection
+        configMap.put(DIRTY_COLLECT_ENABLE, "true");
+        // If ignore error messages during dirty data collection
+        configMap.put(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, "true");
+        // The storage where dirty data will be stored currently only supports 
'inlong', 
+        // which means sending the data to DataSroxy
+        configMap.put(DIRTY_SIDE_OUTPUT_CONNECTOR, "inlong");
+        // The labels of dirty side-output, format is 'key1=value1&key2=value2'
+        configMap.put(DIRTY_SIDE_OUTPUT_LABELS, "key1=value1&key2=value2");
+        // The log tag of dirty side-output, it supports variable replace like 
'${variable}'.
+        configMap.put(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData");
+        Configure config = new Configure(configMap);
+
+        DirtyDataCollector collecter = new DirtyDataCollector();
+        collector.open(config);
+```
+
+### Collect dirty data
+
+```java
+    // In fact, the dirty data we encounter is often parsed incorrectly, 
+    // so we use byte [] as the format for dirty data.
+    byte[] dirtyData = 
"xxxxxxxxxyyyyyyyyyyyyyy".getBytes(StandardCharsets.UTF_8);
+    // Here, incorrect types can be marked, such as missing fields, type 
errors, or unknown errors, etc.
+    String dirtyType = "Undefined";
+    // Details of errors can be passed here.
+    Throwable error = new Throwable();
+    collector.invoke(dirtyData, dirtyType, error);
+```
+
+                                           |
\ No newline at end of file
diff --git a/inlong-sdk/dirty-data-sdk/pom.xml 
b/inlong-sdk/dirty-data-sdk/pom.xml
new file mode 100644
index 0000000000..853e873094
--- /dev/null
+++ b/inlong-sdk/dirty-data-sdk/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>inlong-sdk</artifactId>
+        <version>2.1.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>dirty-data-sdk</artifactId>
+    <name>Apache InLong - Dirty Data SDK</name>
+    <properties>
+        <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>inlong-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sdk-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>dataproxy-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <artifactId>maven-clean-plugin</artifactId>
+                    <version>3.1.0</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-resources-plugin</artifactId>
+                    <version>3.2.0</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <version>${plugin.compile.version}</version>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-surefire-plugin</artifactId>
+                    <version>${plugin.surefire.version}</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-jar-plugin</artifactId>
+                    <version>3.0.2</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-install-plugin</artifactId>
+                    <version>2.5.2</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-deploy-plugin</artifactId>
+                    <version>${plugin.deploy.version}</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-site-plugin</artifactId>
+                    <version>3.7.1</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-project-info-reports-plugin</artifactId>
+                    <version>3.0.0</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
+</project>
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
new file mode 100644
index 0000000000..933f81a67b
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+/**
+ * Connector base option constant
+ */
+public final class Constants {
+
+    public static final String DIRTY_COLLECT_ENABLE = "dirty.collect.enable";
+
+    public static final String DIRTY_SIDE_OUTPUT_CONNECTOR = 
"dirty.side-output.connector";
+
+    public static final String DIRTY_SIDE_OUTPUT_IGNORE_ERRORS = 
"dirty.side-output.ignore-errors";
+
+    /**
+     * The labels of dirty side-output, format is 'key1=value1&key2=value2'
+     * it supports variable replace like '${variable}'
+     * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE]
+     * are currently supported,
+     * and the support of other variables is determined by the connector.
+     */
+    public static final String DIRTY_SIDE_OUTPUT_LABELS = 
"dirty.side-output.labels";
+
+    /**
+     * The log tag of dirty side-output, it supports variable replace like 
'${variable}'.
+     * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] 
are currently supported,
+     * and the support of other variables is determined by the connector.
+     */
+    public static final String DIRTY_SIDE_OUTPUT_LOG_TAG = 
"dirty.side-output.log-tag";
+
+    /**
+     * It is used for 'inlong.metric.labels' or 'sink.dirty.labels'
+     */
+    public static final String DELIMITER = "&";
+
+    /**
+     * The delimiter of key and value, it is used for 'inlong.metric.labels' 
or 'sink.dirty.labels'
+     */
+    public static final String KEY_VALUE_DELIMITER = "=";
+}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
new file mode 100644
index 0000000000..93caf8b57e
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Dirty data base class, it is a wrapper of dirty data
+ */
+public class DirtyData {
+
+    private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
+
+    private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
+    private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
+
+    private static final DateTimeFormatter DATE_TIME_FORMAT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+    /**
+     * The identifier of dirty data, it will be used for filename generation 
of file dirty sink,
+     * topic generation of mq dirty sink, tablename generation of database, 
etc,
+     * and it supports variable replace like '${variable}'.
+     * There are several system 
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,
+     * and the support of other variables is determined by the connector.
+     */
+    private final String identifier;
+    /**
+     * The labels of the dirty data, it will be written to store system of 
dirty
+     */
+    private final String labels;
+    /**
+     * The log tag of dirty data, it is only used to format log as follows:
+     * [${logTag}] ${labels} ${data}
+     */
+    private final String logTag;
+    /**
+     * Dirty type
+     */
+    private final String dirtyType;
+    /**
+     * Dirty describe message, it is the cause of dirty data
+     */
+    private final String dirtyMessage;
+    /**
+     * The real dirty data
+     */
+    private final byte[] data;
+
+    public DirtyData(byte[] data, String identifier, String labels,
+            String logTag, String dirtyType, String dirtyMessage) {
+        this.data = data;
+        this.dirtyType = dirtyType;
+        this.dirtyMessage = dirtyMessage;
+        Map<String, String> paramMap = genParamMap();
+        this.labels = PatternReplaceUtils.replace(labels, paramMap);
+        this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
+        this.identifier = PatternReplaceUtils.replace(identifier, paramMap);
+
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    private Map<String, String> genParamMap() {
+        Map<String, String> paramMap = new HashMap<>();
+        paramMap.put(SYSTEM_TIME_KEY, 
DATE_TIME_FORMAT.format(LocalDateTime.now()));
+        paramMap.put(DIRTY_TYPE_KEY, dirtyType);
+        paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
+        return paramMap;
+    }
+
+    public String getLabels() {
+        return labels;
+    }
+
+    public String getLogTag() {
+        return logTag;
+    }
+
+    public byte[] getData() {
+        return data;
+    }
+
+    public String getDirtyType() {
+        return dirtyType;
+    }
+
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    public static class Builder {
+
+        private String identifier;
+        private String labels;
+        private String logTag;
+        private String dirtyType = "UNDEFINED";
+        private String dirtyMessage;
+        private byte[] data;
+
+        public Builder setDirtyType(String dirtyType) {
+            this.dirtyType = dirtyType;
+            return this;
+        }
+
+        public Builder setLabels(String labels) {
+            this.labels = labels;
+            return this;
+        }
+
+        public Builder setData(byte[] data) {
+            this.data = data;
+            return this;
+        }
+
+        public Builder setLogTag(String logTag) {
+            this.logTag = logTag;
+            return this;
+        }
+
+        public Builder setDirtyMessage(String dirtyMessage) {
+            this.dirtyMessage = dirtyMessage;
+            return this;
+        }
+
+        public DirtyData build() {
+            return new DirtyData(data, identifier, labels, logTag, dirtyType, 
dirtyMessage);
+        }
+    }
+}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
new file mode 100644
index 0000000000..bd8afffe62
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import org.apache.inlong.sdk.dirtydata.DirtyData.Builder;
+import org.apache.inlong.sdk.dirtydata.sink.Configure;
+import org.apache.inlong.sdk.dirtydata.sink.InlongSdkDirtySink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Dirty sink helper, it helps dirty data sink for {@link DirtySink}
+ */
+public class DirtyDataCollector implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DirtyDataCollector.class);
+    static final Pattern REGEX_PATTERN = 
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
+    private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
+    private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
+    private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
+    private static final DateTimeFormatter DATE_TIME_FORMAT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    private DirtyOptions dirtyOptions;
+    private DirtySink dirtySink;
+    private Configure config;
+
+    public DirtyDataCollector() {
+    }
+
+    /**
+     * Open for dirty sink
+     *
+     * @param configuration The configuration that is used for dirty sink
+     */
+    public void open(Configure configuration) {
+        config = configuration;
+        if (dirtyOptions == null) {
+            dirtyOptions = DirtyOptions.fromConfig(configuration);
+        }
+        dirtyOptions.validate();
+        if (!dirtyOptions.isEnableDirtyCollect()) {
+            return;
+        }
+        dirtySink = createDirtySink(dirtyOptions.getSinkType());
+        try {
+            dirtySink.open(configuration);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    private DirtySink createDirtySink(String sinkType) {
+        DirtySink sink;
+        try {
+            switch (sinkType) {
+                case "inlong": {
+                    sink = new InlongSdkDirtySink();
+                    sink.open(config);
+                    return sink;
+                }
+                default: {
+                    LOGGER.error("invalid dirty sink type {}", sinkType);
+                    return null;
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("create dirty sink error", e);
+        }
+        return null;
+    }
+
+    /**
+     * Dirty data sink
+     *
+     * @param data The dirty data
+     * @param dirtyType The dirty type
+     * @param e The cause of dirty data
+     */
+    public void invoke(byte[] data, String dirtyType, Throwable e) {
+        invoke(data, dirtyType, dirtyOptions.getLabels(), 
dirtyOptions.getLogTag(), e);
+    }
+
+    /**
+     * Dirty data sink
+     *
+     * @param data The dirty data
+     * @param dirtyType The dirty type
+     * @param label The dirty label
+     * @param logTag The dirty logTag
+     * @param e The cause of dirty data
+     */
+    public void invoke(byte[] data, String dirtyType, String label, String 
logTag, Throwable e) {
+        if (!dirtyOptions.isEnableDirtyCollect()) {
+            return;
+        }
+        if (dirtySink != null) {
+            Builder builder = DirtyData.builder();
+            try {
+                builder.setData(data)
+                        .setDirtyType(dirtyType)
+                        .setLabels(label)
+                        .setLogTag(logTag)
+                        .setDirtyMessage(e.getMessage());
+                dirtySink.invoke(builder.build());
+            } catch (Exception ex) {
+                if (!dirtyOptions.isIgnoreSideOutputErrors()) {
+                    throw new RuntimeException(ex);
+                }
+                LOGGER.warn("Dirty sink failed", ex);
+            }
+        }
+    }
+
+    /**
+     * replace ${SYSTEM_TIME} with real time
+     *
+     * @param pattern
+     * @return
+     */
+    public static String regexReplace(String pattern, String dirtyType, String 
dirtyMessage) {
+        if (pattern == null) {
+            return null;
+        }
+
+        Map<String, String> paramMap = new HashMap<>(6);
+        paramMap.put(SYSTEM_TIME_KEY, 
DATE_TIME_FORMAT.format(LocalDateTime.now()));
+        paramMap.put(DIRTY_TYPE_KEY, dirtyType);
+        paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
+
+        Matcher matcher = REGEX_PATTERN.matcher(pattern);
+        StringBuffer sb = new StringBuffer();
+        while (matcher.find()) {
+            String keyText = matcher.group(1);
+            String replacement = paramMap.get(keyText);
+            if (replacement == null) {
+                continue;
+            }
+            matcher.appendReplacement(sb, replacement);
+        }
+        matcher.appendTail(sb);
+        return sb.toString();
+    }
+
+    /**
+     * replace ${database} ${table} etc. Used in cases where 
jsonDynamicFormat.parse() is not usable.
+     */
+    public static String regexReplace(String pattern, String dirtyType, String 
dirtyMessage, String database,
+            String table, String schema) {
+        if (pattern == null) {
+            return null;
+        }
+
+        Map<String, String> paramMap = new HashMap<>(6);
+        paramMap.put(SYSTEM_TIME_KEY, 
DATE_TIME_FORMAT.format(LocalDateTime.now()));
+        paramMap.put(DIRTY_TYPE_KEY, dirtyType);
+        paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
+        paramMap.put("source.database", database);
+        paramMap.put("database", database);
+        paramMap.put("source.table", table);
+        paramMap.put("table", table);
+        if (schema != null) {
+            paramMap.put("source.schema", schema);
+            paramMap.put("schema", schema);
+        }
+
+        Matcher matcher = REGEX_PATTERN.matcher(pattern);
+        StringBuffer sb = new StringBuffer();
+        while (matcher.find()) {
+            String keyText = matcher.group(1);
+            String replacement = paramMap.get(keyText);
+            if (replacement == null) {
+                continue;
+            }
+            matcher.appendReplacement(sb, replacement);
+        }
+        matcher.appendTail(sb);
+        return sb.toString();
+    }
+
+    public void setDirtyOptions(DirtyOptions dirtyOptions) {
+        this.dirtyOptions = dirtyOptions;
+    }
+
+    public DirtyOptions getDirtyOptions() {
+        return dirtyOptions;
+    }
+
+    @Nullable
+    public DirtySink getDirtySink() {
+        return dirtySink;
+    }
+}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
new file mode 100644
index 0000000000..d70127adf8
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import org.apache.inlong.sdk.dirtydata.sink.Configure;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+
+import java.io.Serializable;
+
+import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_COLLECT_ENABLE;
+import static 
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_CONNECTOR;
+import static 
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static 
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LABELS;
+import static 
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG;
+
+/**
+ * Dirty common options
+ */
+@Data
+@Builder
+@Getter
+public class DirtyOptions implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private static final String DEFAULT_FORMAT = "csv";
+    private static final String DEFAULT_CSV_FIELD_DELIMITER = ",";
+    private final boolean enableDirtyCollect;
+    private final boolean ignoreSideOutputErrors;
+    private final String sinkType;
+    private final String labels;
+    private final String logTag;
+    private final String format;
+    private final String csvFieldDelimiter;
+
+    private DirtyOptions(boolean enableDirtyCollect, boolean 
ignoreSideOutputErrors,
+            String sinkType, String labels, String logTag, String format, 
String csvFieldDelimiter) {
+        this.enableDirtyCollect = enableDirtyCollect;
+        this.ignoreSideOutputErrors = ignoreSideOutputErrors;
+        this.sinkType = sinkType;
+        this.labels = labels;
+        this.logTag = logTag;
+        this.format = format;
+        this.csvFieldDelimiter = csvFieldDelimiter;
+    }
+
+    /**
+     * Get dirty options from {@link Configure}
+     *
+     * @param config The config
+     * @return Dirty options
+     */
+    public static DirtyOptions fromConfig(Configure config) {
+        boolean enableDirtyCollect = config.getBoolean(DIRTY_COLLECT_ENABLE, 
false);
+        boolean ignoreSinkError = 
config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, true);
+        String dirtyConnector = config.get(DIRTY_SIDE_OUTPUT_CONNECTOR, null);
+        String labels = config.get(DIRTY_SIDE_OUTPUT_LABELS, null);
+        String logTag = config.get(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData");
+        String format = DEFAULT_FORMAT;
+        String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER;
+        return new DirtyOptions(enableDirtyCollect, ignoreSinkError,
+                dirtyConnector, labels, logTag, format, csvFieldDelimiter);
+    }
+
+    public void validate() {
+        if (!enableDirtyCollect) {
+            return;
+        }
+        if (sinkType == null || sinkType.trim().length() == 0) {
+            throw new RuntimeException(
+                    "The option 'dirty.side-output.connector' is not allowed 
to be empty "
+                            + "when the option 'dirty.ignore' is 'true' "
+                            + "and the option 'dirty.side-output.enable' is 
'true'");
+        }
+    }
+}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
new file mode 100644
index 0000000000..68d8cc9110
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import org.apache.inlong.sdk.dirtydata.sink.Configure;
+
+import java.io.Serializable;
+
+/**
+ * The dirty sink base inteface
+ *
+ */
+public interface DirtySink extends Serializable {
+
+    /**
+     * Open for dirty sink
+     *
+     * @param configuration The configuration that is used for dirty sink
+     * @throws Exception The exception may be thrown when executing
+     */
+    default void open(Configure configuration) throws Exception {
+
+    }
+
+    /**
+     * Invoke that is used to sink dirty data
+     *
+     * @param dirtyData The dirty data that will be written
+     * @throws Exception The exception may be thrown when executing
+     */
+    void invoke(DirtyData dirtyData) throws Exception;
+
+    /**
+     * Close for dirty sink
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    default void close() throws Exception {
+
+    }
+
+}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
new file mode 100644
index 0000000000..20f70c5205
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The pattern replace utils
+ */
+public final class PatternReplaceUtils {
+
+    private static final Pattern REGEX_PATTERN = 
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}",
+            Pattern.CASE_INSENSITIVE);
+
+    public static String replace(String pattern, Map<String, String> params) {
+        if (pattern == null) {
+            return null;
+        }
+        Matcher matcher = REGEX_PATTERN.matcher(pattern);
+        StringBuffer sb = new StringBuffer();
+        while (matcher.find()) {
+            String keyText = matcher.group(1);
+            String replacement = params.getOrDefault(keyText, keyText);
+            matcher.appendReplacement(sb, replacement);
+        }
+        matcher.appendTail(sb);
+        return sb.toString();
+    }
+}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
new file mode 100644
index 0000000000..e2031915c6
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata.sink;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Configure {
+
+    private Map<String, String> data;
+
+    public Configure(Map<String, String> data) {
+        this.data = new ConcurrentHashMap<>();
+        this.data.putAll(data);
+    }
+
+    public String get(String key, String defaultValue) {
+        String value = data.get(key);
+        if (value != null) {
+            return value;
+        }
+        return defaultValue;
+    }
+
+    public String get(String key) {
+        return data.get(key);
+    }
+
+    public Boolean getBoolean(String key, boolean defaultValue) {
+        String value = data.get(key);
+        if (value != null) {
+            return Boolean.valueOf(value);
+        }
+        return defaultValue;
+    }
+}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
new file mode 100644
index 0000000000..bef0fc3110
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata.sink;
+
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.MessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dirtydata.DirtyData;
+import org.apache.inlong.sdk.dirtydata.DirtySink;
+
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.StringJoiner;
+
+import static 
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+
+@Slf4j
+public class InlongSdkDirtySink implements DirtySink {
+
+    // The inlong manager addr to init inlong sdk
+    private static final String DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR =
+            "dirty.side-output.inlong-sdk.inlong-manager-addr";
+    // The inlong manager auth id to init inlong sdk
+    private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID = 
"dirty.side-output.inlong-sdk.inlong-auth-id";
+    // The inlong manager auth id to init inlong sdk
+    private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY = 
"dirty.side-output.inlong-sdk.inlong-auth-key";
+    // The inlong group id of dirty sink
+    private static final String DIRTY_SIDE_OUTPUT_INLONG_GROUP = 
"dirty.side-output.inlong-sdk.inlong-group-id";
+    // The inlong stream id of dirty sink
+    private static final String DIRTY_SIDE_OUTPUT_INLONG_STREAM = 
"dirty.side-output.inlong-sdk.inlong-stream-id";
+
+    private InlongSdkOptions options;
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private final SendMessageCallback callback;
+
+    private transient DateTimeFormatter dateTimeFormatter;
+    private transient MessageSender sender;
+
+    public InlongSdkDirtySink() {
+        this.callback = new LogCallBack();
+    }
+
+    @Override
+    public void open(Configure configuration) throws Exception {
+        options = getOptions(configuration);
+        this.inlongGroupId = options.getInlongGroupId();
+        this.inlongStreamId = options.getInlongStreamId();
+        dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+        // init sender
+        ProxyClientConfig proxyClientConfig =
+                new ProxyClientConfig(options.getInlongManagerAddr(), 
options.getInlongGroupId(),
+                        options.getInlongManagerAuthId(), 
options.getInlongManagerAuthKey());
+        sender = 
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
+    }
+
+    @Override
+    public void invoke(DirtyData dirtyData) {
+        try {
+            Map<String, String> labelMap = 
LabelUtils.parseLabels(dirtyData.getLabels());
+            String groupId = 
Preconditions.checkNotNull(labelMap.get("groupId"));
+            String streamId = 
Preconditions.checkNotNull(labelMap.get("streamId"));
+
+            String message = join(groupId, streamId,
+                    dirtyData.getDirtyType(), dirtyData.getLabels(),
+                    new String(dirtyData.getData(), StandardCharsets.UTF_8));
+            sender.asyncSendMessage(inlongGroupId, inlongStreamId, 
message.getBytes(), callback);
+        } catch (Throwable t) {
+            log.error("failed to send dirty message to inlong sdk", t);
+        }
+    }
+
+    private InlongSdkOptions getOptions(Configure config) {
+        return InlongSdkOptions.builder()
+                
.inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR))
+                .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
+                .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
+                
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
+                
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
+                
.ignoreSideOutputErrors(config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, 
true))
+                .enableDirtyLog(true)
+                .build();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (sender != null) {
+            sender.close();
+        }
+    }
+
+    private String join(
+            String inlongGroup,
+            String inlongStream,
+            String type,
+            String label,
+            String formattedData) {
+
+        String now = LocalDateTime.now().format(dateTimeFormatter);
+
+        StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter());
+        return joiner.add(inlongGroup + "." + inlongStream)
+                .add(now)
+                .add(type)
+                .add(label)
+                .add(formattedData).toString();
+    }
+
+    class LogCallBack implements SendMessageCallback {
+
+        @Override
+        public void onMessageAck(SendResult result) {
+            if (result == SendResult.OK) {
+                return;
+            }
+            log.error("failed to send inlong dirty message, response={}", 
result);
+
+            if (!options.isIgnoreSideOutputErrors()) {
+                throw new RuntimeException("writing dirty message to inlong 
sdk failed, response=" + result);
+            }
+        }
+
+        @Override
+        public void onException(Throwable e) {
+            log.error("failed to send inlong dirty message", e);
+
+            if (!options.isIgnoreSideOutputErrors()) {
+                throw new RuntimeException("writing dirty message to inlong 
sdk failed", e);
+            }
+        }
+    }
+}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
new file mode 100644
index 0000000000..b657a97f20
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata.sink;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+@Getter
+public class InlongSdkOptions implements Serializable {
+
+    private static final String DEFAULT_FORMAT = "csv";
+
+    private static final String DEFAULT_CSV_FIELD_DELIMITER = ",";
+    private static final String DEFAULT_CSV_LINE_DELIMITER = "\n";
+
+    private static final String DEFAULT_KV_FIELD_DELIMITER = "&";
+    private static final String DEFAULT_KV_ENTRY_DELIMITER = "=";
+
+    private String inlongGroupId;
+    private String inlongStreamId;
+    private String inlongManagerAddr;
+    private String inlongManagerAuthKey;
+    private String inlongManagerAuthId;
+    private String format = DEFAULT_FORMAT;
+    private boolean ignoreSideOutputErrors;
+    private boolean enableDirtyLog;
+    private String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER;
+    private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER;
+    private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER;
+    private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER;
+}
diff --git 
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
new file mode 100644
index 0000000000..2ce58b134d
--- /dev/null
+++ 
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata.sink;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.inlong.sdk.dirtydata.Constants.DELIMITER;
+import static org.apache.inlong.sdk.dirtydata.Constants.KEY_VALUE_DELIMITER;
+
+/**
+ * The label utils class, is used to parse the labels to a label map
+ */
+public final class LabelUtils {
+
+    private LabelUtils() {
+    }
+
+    /**
+     * Parse the labels to label map
+     *
+     * @param labels The labels format by 'key1=value1&key2=value2...'
+     * @return The label map of labels
+     */
+    public static Map<String, String> parseLabels(String labels) {
+        return parseLabels(labels, new LinkedHashMap<>());
+    }
+
+    /**
+     * Parse the labels to label map
+     *
+     * @param labels The labels format by 'key1=value1&key2=value2...'
+     * @return The label map of labels
+     */
+    public static Map<String, String> parseLabels(String labels, Map<String, 
String> labelMap) {
+        if (labelMap == null) {
+            labelMap = new LinkedHashMap<>();
+        }
+        if (labels == null || labels.length() == 0) {
+            return labelMap;
+        }
+        String[] labelArray = labels.split(DELIMITER);
+        for (String label : labelArray) {
+            int index = label.indexOf(KEY_VALUE_DELIMITER);
+            if (index < 1 || index == label.length() - 1) {
+                throw new IllegalArgumentException("The format of labels must 
be like 'key1=value1&key2=value2...'");
+            }
+            labelMap.put(label.substring(0, index), label.substring(index + 
1));
+        }
+        return labelMap;
+    }
+}
diff --git a/inlong-sdk/pom.xml b/inlong-sdk/pom.xml
index 21b99e24eb..27025d06f5 100644
--- a/inlong-sdk/pom.xml
+++ b/inlong-sdk/pom.xml
@@ -34,6 +34,7 @@
         <module>sort-sdk</module>
         <module>dataproxy-sdk</module>
         <module>transform-sdk</module>
+        <module>dirty-data-sdk</module>
     </modules>
 
     <properties>


Reply via email to