This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fd80e0373b [INLONG-11426][SDK] Optimize dirty data sdk (#11427)
fd80e0373b is described below
commit fd80e0373be8e38e102db973691e81fd1efc41eb
Author: vernedeng <[email protected]>
AuthorDate: Mon Oct 28 20:55:33 2024 +0800
[INLONG-11426][SDK] Optimize dirty data sdk (#11427)
* [INLONG-11426][SDK] Optimize dirty data sdk
* [INLONG-11426][SDK] Optimize dirty data sdk
---
.../org/apache/inlong/sdk/dirtydata/Constants.java | 56 ------
.../org/apache/inlong/sdk/dirtydata/DirtyData.java | 149 --------------
.../inlong/sdk/dirtydata/DirtyDataCollector.java | 219 ---------------------
.../inlong/sdk/dirtydata/DirtyMessageWrapper.java | 63 ++++++
.../apache/inlong/sdk/dirtydata/DirtyOptions.java | 93 ---------
.../org/apache/inlong/sdk/dirtydata/DirtySink.java | 57 ------
.../inlong/sdk/dirtydata/InlongSdkDirtySink.java | 87 ++++++++
.../inlong/sdk/dirtydata/PatternReplaceUtils.java | 46 -----
.../inlong/sdk/dirtydata/sink/Configure.java | 51 -----
.../sdk/dirtydata/sink/InlongSdkDirtySink.java | 154 ---------------
.../sdk/dirtydata/sink/InlongSdkOptions.java | 51 -----
.../inlong/sdk/dirtydata/sink/LabelUtils.java | 67 -------
12 files changed, 150 insertions(+), 943 deletions(-)
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
deleted file mode 100644
index 933f81a67b..0000000000
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-/**
- * Connector base option constant
- */
-public final class Constants {
-
- public static final String DIRTY_COLLECT_ENABLE = "dirty.collect.enable";
-
- public static final String DIRTY_SIDE_OUTPUT_CONNECTOR =
"dirty.side-output.connector";
-
- public static final String DIRTY_SIDE_OUTPUT_IGNORE_ERRORS =
"dirty.side-output.ignore-errors";
-
- /**
- * The labels of dirty side-output, format is 'key1=value1&key2=value2'
- * it supports variable replace like '${variable}'
- * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE]
- * are currently supported,
- * and the support of other variables is determined by the connector.
- */
- public static final String DIRTY_SIDE_OUTPUT_LABELS =
"dirty.side-output.labels";
-
- /**
- * The log tag of dirty side-output, it supports variable replace like
'${variable}'.
- * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE]
are currently supported,
- * and the support of other variables is determined by the connector.
- */
- public static final String DIRTY_SIDE_OUTPUT_LOG_TAG =
"dirty.side-output.log-tag";
-
- /**
- * It is used for 'inlong.metric.labels' or 'sink.dirty.labels'
- */
- public static final String DELIMITER = "&";
-
- /**
- * The delimiter of key and value, it is used for 'inlong.metric.labels'
or 'sink.dirty.labels'
- */
- public static final String KEY_VALUE_DELIMITER = "=";
-}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
deleted file mode 100644
index 93caf8b57e..0000000000
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Dirty data base class, it is a wrapper of dirty data
- */
-public class DirtyData {
-
- private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
-
- private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
- private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
-
- private static final DateTimeFormatter DATE_TIME_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-
- /**
- * The identifier of dirty data, it will be used for filename generation
of file dirty sink,
- * topic generation of mq dirty sink, tablename generation of database,
etc,
- * and it supports variable replace like '${variable}'.
- * There are several system
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,
- * and the support of other variables is determined by the connector.
- */
- private final String identifier;
- /**
- * The labels of the dirty data, it will be written to store system of
dirty
- */
- private final String labels;
- /**
- * The log tag of dirty data, it is only used to format log as follows:
- * [${logTag}] ${labels} ${data}
- */
- private final String logTag;
- /**
- * Dirty type
- */
- private final String dirtyType;
- /**
- * Dirty describe message, it is the cause of dirty data
- */
- private final String dirtyMessage;
- /**
- * The real dirty data
- */
- private final byte[] data;
-
- public DirtyData(byte[] data, String identifier, String labels,
- String logTag, String dirtyType, String dirtyMessage) {
- this.data = data;
- this.dirtyType = dirtyType;
- this.dirtyMessage = dirtyMessage;
- Map<String, String> paramMap = genParamMap();
- this.labels = PatternReplaceUtils.replace(labels, paramMap);
- this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
- this.identifier = PatternReplaceUtils.replace(identifier, paramMap);
-
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- private Map<String, String> genParamMap() {
- Map<String, String> paramMap = new HashMap<>();
- paramMap.put(SYSTEM_TIME_KEY,
DATE_TIME_FORMAT.format(LocalDateTime.now()));
- paramMap.put(DIRTY_TYPE_KEY, dirtyType);
- paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
- return paramMap;
- }
-
- public String getLabels() {
- return labels;
- }
-
- public String getLogTag() {
- return logTag;
- }
-
- public byte[] getData() {
- return data;
- }
-
- public String getDirtyType() {
- return dirtyType;
- }
-
- public String getIdentifier() {
- return identifier;
- }
-
- public static class Builder {
-
- private String identifier;
- private String labels;
- private String logTag;
- private String dirtyType = "UNDEFINED";
- private String dirtyMessage;
- private byte[] data;
-
- public Builder setDirtyType(String dirtyType) {
- this.dirtyType = dirtyType;
- return this;
- }
-
- public Builder setLabels(String labels) {
- this.labels = labels;
- return this;
- }
-
- public Builder setData(byte[] data) {
- this.data = data;
- return this;
- }
-
- public Builder setLogTag(String logTag) {
- this.logTag = logTag;
- return this;
- }
-
- public Builder setDirtyMessage(String dirtyMessage) {
- this.dirtyMessage = dirtyMessage;
- return this;
- }
-
- public DirtyData build() {
- return new DirtyData(data, identifier, labels, logTag, dirtyType,
dirtyMessage);
- }
- }
-}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
deleted file mode 100644
index bd8afffe62..0000000000
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-import org.apache.inlong.sdk.dirtydata.DirtyData.Builder;
-import org.apache.inlong.sdk.dirtydata.sink.Configure;
-import org.apache.inlong.sdk.dirtydata.sink.InlongSdkDirtySink;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Dirty sink helper, it helps dirty data sink for {@link DirtySink}
- */
-public class DirtyDataCollector implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private static final Logger LOGGER =
LoggerFactory.getLogger(DirtyDataCollector.class);
- static final Pattern REGEX_PATTERN =
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
- private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
- private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
- private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
- private static final DateTimeFormatter DATE_TIME_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- private DirtyOptions dirtyOptions;
- private DirtySink dirtySink;
- private Configure config;
-
- public DirtyDataCollector() {
- }
-
- /**
- * Open for dirty sink
- *
- * @param configuration The configuration that is used for dirty sink
- */
- public void open(Configure configuration) {
- config = configuration;
- if (dirtyOptions == null) {
- dirtyOptions = DirtyOptions.fromConfig(configuration);
- }
- dirtyOptions.validate();
- if (!dirtyOptions.isEnableDirtyCollect()) {
- return;
- }
- dirtySink = createDirtySink(dirtyOptions.getSinkType());
- try {
- dirtySink.open(configuration);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
-
- private DirtySink createDirtySink(String sinkType) {
- DirtySink sink;
- try {
- switch (sinkType) {
- case "inlong": {
- sink = new InlongSdkDirtySink();
- sink.open(config);
- return sink;
- }
- default: {
- LOGGER.error("invalid dirty sink type {}", sinkType);
- return null;
- }
- }
- } catch (Exception e) {
- LOGGER.error("create dirty sink error", e);
- }
- return null;
- }
-
- /**
- * Dirty data sink
- *
- * @param data The dirty data
- * @param dirtyType The dirty type
- * @param e The cause of dirty data
- */
- public void invoke(byte[] data, String dirtyType, Throwable e) {
- invoke(data, dirtyType, dirtyOptions.getLabels(),
dirtyOptions.getLogTag(), e);
- }
-
- /**
- * Dirty data sink
- *
- * @param data The dirty data
- * @param dirtyType The dirty type
- * @param label The dirty label
- * @param logTag The dirty logTag
- * @param e The cause of dirty data
- */
- public void invoke(byte[] data, String dirtyType, String label, String
logTag, Throwable e) {
- if (!dirtyOptions.isEnableDirtyCollect()) {
- return;
- }
- if (dirtySink != null) {
- Builder builder = DirtyData.builder();
- try {
- builder.setData(data)
- .setDirtyType(dirtyType)
- .setLabels(label)
- .setLogTag(logTag)
- .setDirtyMessage(e.getMessage());
- dirtySink.invoke(builder.build());
- } catch (Exception ex) {
- if (!dirtyOptions.isIgnoreSideOutputErrors()) {
- throw new RuntimeException(ex);
- }
- LOGGER.warn("Dirty sink failed", ex);
- }
- }
- }
-
- /**
- * replace ${SYSTEM_TIME} with real time
- *
- * @param pattern
- * @return
- */
- public static String regexReplace(String pattern, String dirtyType, String
dirtyMessage) {
- if (pattern == null) {
- return null;
- }
-
- Map<String, String> paramMap = new HashMap<>(6);
- paramMap.put(SYSTEM_TIME_KEY,
DATE_TIME_FORMAT.format(LocalDateTime.now()));
- paramMap.put(DIRTY_TYPE_KEY, dirtyType);
- paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
-
- Matcher matcher = REGEX_PATTERN.matcher(pattern);
- StringBuffer sb = new StringBuffer();
- while (matcher.find()) {
- String keyText = matcher.group(1);
- String replacement = paramMap.get(keyText);
- if (replacement == null) {
- continue;
- }
- matcher.appendReplacement(sb, replacement);
- }
- matcher.appendTail(sb);
- return sb.toString();
- }
-
- /**
- * replace ${database} ${table} etc. Used in cases where
jsonDynamicFormat.parse() is not usable.
- */
- public static String regexReplace(String pattern, String dirtyType, String
dirtyMessage, String database,
- String table, String schema) {
- if (pattern == null) {
- return null;
- }
-
- Map<String, String> paramMap = new HashMap<>(6);
- paramMap.put(SYSTEM_TIME_KEY,
DATE_TIME_FORMAT.format(LocalDateTime.now()));
- paramMap.put(DIRTY_TYPE_KEY, dirtyType);
- paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
- paramMap.put("source.database", database);
- paramMap.put("database", database);
- paramMap.put("source.table", table);
- paramMap.put("table", table);
- if (schema != null) {
- paramMap.put("source.schema", schema);
- paramMap.put("schema", schema);
- }
-
- Matcher matcher = REGEX_PATTERN.matcher(pattern);
- StringBuffer sb = new StringBuffer();
- while (matcher.find()) {
- String keyText = matcher.group(1);
- String replacement = paramMap.get(keyText);
- if (replacement == null) {
- continue;
- }
- matcher.appendReplacement(sb, replacement);
- }
- matcher.appendTail(sb);
- return sb.toString();
- }
-
- public void setDirtyOptions(DirtyOptions dirtyOptions) {
- this.dirtyOptions = dirtyOptions;
- }
-
- public DirtyOptions getDirtyOptions() {
- return dirtyOptions;
- }
-
- @Nullable
- public DirtySink getDirtySink() {
- return dirtySink;
- }
-}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
new file mode 100644
index 0000000000..984c456480
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import lombok.Builder;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Base64;
+import java.util.StringJoiner;
+
+@Builder
+public class DirtyMessageWrapper {
+
+ private static DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ private String delimiter;
+
+ private String inlongGroupId;
+ private String inlongStreamId;
+ private String dataTime;
+ private String dataflowId;
+ private String serverType;
+ private String dirtyType;
+ private String ext;
+ private String data;
+ private byte[] dataBytes;
+
+ public String format() {
+ String now = LocalDateTime.now().format(dateTimeFormatter);
+ StringJoiner joiner = new StringJoiner(delimiter);
+ String formatData = null;
+ if (data != null) {
+ formatData = data;
+ } else if (dataBytes != null) {
+ formatData = Base64.getEncoder().encodeToString(dataBytes);
+ }
+
+ return joiner.add(inlongGroupId)
+ .add(inlongStreamId)
+ .add(now)
+ .add(dataTime)
+ .add(dataflowId)
+ .add(serverType)
+ .add(dirtyType)
+ .add(ext)
+ .add(formatData).toString();
+ }
+}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
deleted file mode 100644
index d70127adf8..0000000000
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-import org.apache.inlong.sdk.dirtydata.sink.Configure;
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.Getter;
-
-import java.io.Serializable;
-
-import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_COLLECT_ENABLE;
-import static
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_CONNECTOR;
-import static
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
-import static
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LABELS;
-import static
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG;
-
-/**
- * Dirty common options
- */
-@Data
-@Builder
-@Getter
-public class DirtyOptions implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private static final String DEFAULT_FORMAT = "csv";
- private static final String DEFAULT_CSV_FIELD_DELIMITER = ",";
- private final boolean enableDirtyCollect;
- private final boolean ignoreSideOutputErrors;
- private final String sinkType;
- private final String labels;
- private final String logTag;
- private final String format;
- private final String csvFieldDelimiter;
-
- private DirtyOptions(boolean enableDirtyCollect, boolean
ignoreSideOutputErrors,
- String sinkType, String labels, String logTag, String format,
String csvFieldDelimiter) {
- this.enableDirtyCollect = enableDirtyCollect;
- this.ignoreSideOutputErrors = ignoreSideOutputErrors;
- this.sinkType = sinkType;
- this.labels = labels;
- this.logTag = logTag;
- this.format = format;
- this.csvFieldDelimiter = csvFieldDelimiter;
- }
-
- /**
- * Get dirty options from {@link Configure}
- *
- * @param config The config
- * @return Dirty options
- */
- public static DirtyOptions fromConfig(Configure config) {
- boolean enableDirtyCollect = config.getBoolean(DIRTY_COLLECT_ENABLE,
false);
- boolean ignoreSinkError =
config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, true);
- String dirtyConnector = config.get(DIRTY_SIDE_OUTPUT_CONNECTOR, null);
- String labels = config.get(DIRTY_SIDE_OUTPUT_LABELS, null);
- String logTag = config.get(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData");
- String format = DEFAULT_FORMAT;
- String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER;
- return new DirtyOptions(enableDirtyCollect, ignoreSinkError,
- dirtyConnector, labels, logTag, format, csvFieldDelimiter);
- }
-
- public void validate() {
- if (!enableDirtyCollect) {
- return;
- }
- if (sinkType == null || sinkType.trim().length() == 0) {
- throw new RuntimeException(
- "The option 'dirty.side-output.connector' is not allowed
to be empty "
- + "when the option 'dirty.ignore' is 'true' "
- + "and the option 'dirty.side-output.enable' is
'true'");
- }
- }
-}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
deleted file mode 100644
index 68d8cc9110..0000000000
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-import org.apache.inlong.sdk.dirtydata.sink.Configure;
-
-import java.io.Serializable;
-
-/**
- * The dirty sink base inteface
- *
- */
-public interface DirtySink extends Serializable {
-
- /**
- * Open for dirty sink
- *
- * @param configuration The configuration that is used for dirty sink
- * @throws Exception The exception may be thrown when executing
- */
- default void open(Configure configuration) throws Exception {
-
- }
-
- /**
- * Invoke that is used to sink dirty data
- *
- * @param dirtyData The dirty data that will be written
- * @throws Exception The exception may be thrown when executing
- */
- void invoke(DirtyData dirtyData) throws Exception;
-
- /**
- * Close for dirty sink
- *
- * @throws Exception The exception may be thrown when executing
- */
- default void close() throws Exception {
-
- }
-
-}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
new file mode 100644
index 0000000000..2240ebdb6c
--- /dev/null
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dirtydata;
+
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.MessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
+
+import com.google.common.base.Preconditions;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Builder
+public class InlongSdkDirtySink {
+
+ private String inlongGroupId;
+ private String inlongStreamId;
+ private String inlongManagerAddr;
+ private String authId;
+ private String authKey;
+ private boolean ignoreErrors;
+
+ private SendMessageCallback callback;
+ private MessageSender sender;
+
+ public void init() throws Exception {
+ Preconditions.checkNotNull(inlongGroupId, "inlongGroupId cannot be
null");
+ Preconditions.checkNotNull(inlongStreamId, "inlongStreamId cannot be
null");
+ Preconditions.checkNotNull(inlongManagerAddr, "inlongManagerAddr
cannot be null");
+ Preconditions.checkNotNull(authId, "authId cannot be null");
+ Preconditions.checkNotNull(authKey, "authKey cannot be null");
+
+ this.callback = new LogCallBack();
+ ProxyClientConfig proxyClientConfig =
+ new ProxyClientConfig(inlongManagerAddr, inlongGroupId,
authId, authKey);
+ this.sender =
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
+ log.info("init InlongSdkDirtySink successfully, target group={},
stream={}", inlongGroupId, inlongStreamId);
+ }
+
+ public void sendDirtyMessage(DirtyMessageWrapper messageWrapper)
+ throws ProxysdkException {
+ sender.asyncSendMessage(inlongGroupId, inlongStreamId,
messageWrapper.format().getBytes(), callback);
+ }
+
+ class LogCallBack implements SendMessageCallback {
+
+ @Override
+ public void onMessageAck(SendResult result) {
+ if (result == SendResult.OK) {
+ return;
+ }
+ log.error("failed to send inlong dirty message, response={}",
result);
+
+ if (!ignoreErrors) {
+ throw new RuntimeException("writing dirty message to inlong
sdk failed, response=" + result);
+ }
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ log.error("failed to send inlong dirty message", e);
+
+ if (!ignoreErrors) {
+ throw new RuntimeException("writing dirty message to inlong
sdk failed", e);
+ }
+ }
+ }
+}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
deleted file mode 100644
index 20f70c5205..0000000000
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata;
-
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * The pattern replace utils
- */
-public final class PatternReplaceUtils {
-
- private static final Pattern REGEX_PATTERN =
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}",
- Pattern.CASE_INSENSITIVE);
-
- public static String replace(String pattern, Map<String, String> params) {
- if (pattern == null) {
- return null;
- }
- Matcher matcher = REGEX_PATTERN.matcher(pattern);
- StringBuffer sb = new StringBuffer();
- while (matcher.find()) {
- String keyText = matcher.group(1);
- String replacement = params.getOrDefault(keyText, keyText);
- matcher.appendReplacement(sb, replacement);
- }
- matcher.appendTail(sb);
- return sb.toString();
- }
-}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
deleted file mode 100644
index e2031915c6..0000000000
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata.sink;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class Configure {
-
- private Map<String, String> data;
-
- public Configure(Map<String, String> data) {
- this.data = new ConcurrentHashMap<>();
- this.data.putAll(data);
- }
-
- public String get(String key, String defaultValue) {
- String value = data.get(key);
- if (value != null) {
- return value;
- }
- return defaultValue;
- }
-
- public String get(String key) {
- return data.get(key);
- }
-
- public Boolean getBoolean(String key, boolean defaultValue) {
- String value = data.get(key);
- if (value != null) {
- return Boolean.valueOf(value);
- }
- return defaultValue;
- }
-}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
deleted file mode 100644
index bef0fc3110..0000000000
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata.sink;
-
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.MessageSender;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dirtydata.DirtyData;
-import org.apache.inlong.sdk.dirtydata.DirtySink;
-
-import com.google.common.base.Preconditions;
-import lombok.extern.slf4j.Slf4j;
-
-import java.nio.charset.StandardCharsets;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Map;
-import java.util.StringJoiner;
-
-import static
org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
-
-@Slf4j
-public class InlongSdkDirtySink implements DirtySink {
-
- // The inlong manager addr to init inlong sdk
- private static final String DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR =
- "dirty.side-output.inlong-sdk.inlong-manager-addr";
- // The inlong manager auth id to init inlong sdk
- private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID =
"dirty.side-output.inlong-sdk.inlong-auth-id";
- // The inlong manager auth id to init inlong sdk
- private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY =
"dirty.side-output.inlong-sdk.inlong-auth-key";
- // The inlong group id of dirty sink
- private static final String DIRTY_SIDE_OUTPUT_INLONG_GROUP =
"dirty.side-output.inlong-sdk.inlong-group-id";
- // The inlong stream id of dirty sink
- private static final String DIRTY_SIDE_OUTPUT_INLONG_STREAM =
"dirty.side-output.inlong-sdk.inlong-stream-id";
-
- private InlongSdkOptions options;
- private String inlongGroupId;
- private String inlongStreamId;
- private final SendMessageCallback callback;
-
- private transient DateTimeFormatter dateTimeFormatter;
- private transient MessageSender sender;
-
- public InlongSdkDirtySink() {
- this.callback = new LogCallBack();
- }
-
- @Override
- public void open(Configure configuration) throws Exception {
- options = getOptions(configuration);
- this.inlongGroupId = options.getInlongGroupId();
- this.inlongStreamId = options.getInlongStreamId();
- dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // init sender
- ProxyClientConfig proxyClientConfig =
- new ProxyClientConfig(options.getInlongManagerAddr(),
options.getInlongGroupId(),
- options.getInlongManagerAuthId(),
options.getInlongManagerAuthKey());
- sender =
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
- }
-
- @Override
- public void invoke(DirtyData dirtyData) {
- try {
- Map<String, String> labelMap =
LabelUtils.parseLabels(dirtyData.getLabels());
- String groupId =
Preconditions.checkNotNull(labelMap.get("groupId"));
- String streamId =
Preconditions.checkNotNull(labelMap.get("streamId"));
-
- String message = join(groupId, streamId,
- dirtyData.getDirtyType(), dirtyData.getLabels(),
- new String(dirtyData.getData(), StandardCharsets.UTF_8));
- sender.asyncSendMessage(inlongGroupId, inlongStreamId,
message.getBytes(), callback);
- } catch (Throwable t) {
- log.error("failed to send dirty message to inlong sdk", t);
- }
- }
-
- private InlongSdkOptions getOptions(Configure config) {
- return InlongSdkOptions.builder()
-
.inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR))
- .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
- .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
-
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
-
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
-
.ignoreSideOutputErrors(config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS,
true))
- .enableDirtyLog(true)
- .build();
- }
-
- @Override
- public void close() throws Exception {
- if (sender != null) {
- sender.close();
- }
- }
-
- private String join(
- String inlongGroup,
- String inlongStream,
- String type,
- String label,
- String formattedData) {
-
- String now = LocalDateTime.now().format(dateTimeFormatter);
-
- StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter());
- return joiner.add(inlongGroup + "." + inlongStream)
- .add(now)
- .add(type)
- .add(label)
- .add(formattedData).toString();
- }
-
- class LogCallBack implements SendMessageCallback {
-
- @Override
- public void onMessageAck(SendResult result) {
- if (result == SendResult.OK) {
- return;
- }
- log.error("failed to send inlong dirty message, response={}",
result);
-
- if (!options.isIgnoreSideOutputErrors()) {
- throw new RuntimeException("writing dirty message to inlong
sdk failed, response=" + result);
- }
- }
-
- @Override
- public void onException(Throwable e) {
- log.error("failed to send inlong dirty message", e);
-
- if (!options.isIgnoreSideOutputErrors()) {
- throw new RuntimeException("writing dirty message to inlong
sdk failed", e);
- }
- }
- }
-}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
deleted file mode 100644
index b657a97f20..0000000000
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata.sink;
-
-import lombok.Builder;
-import lombok.Data;
-import lombok.Getter;
-
-import java.io.Serializable;
-
-@Data
-@Builder
-@Getter
-public class InlongSdkOptions implements Serializable {
-
- private static final String DEFAULT_FORMAT = "csv";
-
- private static final String DEFAULT_CSV_FIELD_DELIMITER = ",";
- private static final String DEFAULT_CSV_LINE_DELIMITER = "\n";
-
- private static final String DEFAULT_KV_FIELD_DELIMITER = "&";
- private static final String DEFAULT_KV_ENTRY_DELIMITER = "=";
-
- private String inlongGroupId;
- private String inlongStreamId;
- private String inlongManagerAddr;
- private String inlongManagerAuthKey;
- private String inlongManagerAuthId;
- private String format = DEFAULT_FORMAT;
- private boolean ignoreSideOutputErrors;
- private boolean enableDirtyLog;
- private String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER;
- private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER;
- private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER;
- private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER;
-}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
deleted file mode 100644
index 2ce58b134d..0000000000
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.dirtydata.sink;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import static org.apache.inlong.sdk.dirtydata.Constants.DELIMITER;
-import static org.apache.inlong.sdk.dirtydata.Constants.KEY_VALUE_DELIMITER;
-
-/**
- * The label utils class, is used to parse the labels to a label map
- */
-public final class LabelUtils {
-
- private LabelUtils() {
- }
-
- /**
- * Parse the labels to label map
- *
- * @param labels The labels format by 'key1=value1&key2=value2...'
- * @return The label map of labels
- */
- public static Map<String, String> parseLabels(String labels) {
- return parseLabels(labels, new LinkedHashMap<>());
- }
-
- /**
- * Parse the labels to label map
- *
- * @param labels The labels format by 'key1=value1&key2=value2...'
- * @return The label map of labels
- */
- public static Map<String, String> parseLabels(String labels, Map<String,
String> labelMap) {
- if (labelMap == null) {
- labelMap = new LinkedHashMap<>();
- }
- if (labels == null || labels.length() == 0) {
- return labelMap;
- }
- String[] labelArray = labels.split(DELIMITER);
- for (String label : labelArray) {
- int index = label.indexOf(KEY_VALUE_DELIMITER);
- if (index < 1 || index == label.length() - 1) {
- throw new IllegalArgumentException("The format of labels must
be like 'key1=value1&key2=value2...'");
- }
- labelMap.put(label.substring(0, index), label.substring(index +
1));
- }
- return labelMap;
- }
-}