This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ab59b91be2 [INLONG-11481][Sort] Tube Connector source supports dirty
data achieving (#11482)
ab59b91be2 is described below
commit ab59b91be2a2d84b0c0155d84a1da6bad204cc4d
Author: vernedeng <[email protected]>
AuthorDate: Mon Nov 11 18:40:56 2024 +0800
[INLONG-11481][Sort] Tube Connector source supports dirty data achieving
(#11482)
---
.../inlong/sdk/dirtydata/DirtyMessageWrapper.java | 27 +++--
...SdkDirtySink.java => InlongSdkDirtySender.java} | 19 +++-
inlong-sort/sort-flink/base/pom.xml | 5 +
.../org/apache/inlong/sort/base/Constants.java | 2 +-
.../apache/inlong/sort/base/dirty/DirtyData.java | 40 ++++++-
...gSdkOptions.java => InlongSdkDirtyOptions.java} | 7 +-
.../base/dirty/sink/sdk/InlongSdkDirtySink.java | 115 ++++++++-------------
.../dirty/sink/sdk/InlongSdkDirtySinkFactory.java | 39 +++----
.../sort-connectors/tubemq/pom.xml | 9 ++
.../table/DynamicTubeMQDeserializationSchema.java | 2 +-
.../DynamicTubeMQTableDeserializationSchema.java | 61 ++++++++++-
.../tubemq/table/TubeMQDynamicTableFactory.java | 37 ++++---
.../sort/tubemq/table/TubeMQTableSource.java | 15 ++-
13 files changed, 243 insertions(+), 135 deletions(-)
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
index 984c456480..977e002478 100644
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
@@ -18,12 +18,17 @@
package org.apache.inlong.sdk.dirtydata;
import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.text.StringEscapeUtils;
+import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.StringJoiner;
+@Slf4j
@Builder
public class DirtyMessageWrapper {
@@ -32,16 +37,17 @@ public class DirtyMessageWrapper {
private String inlongGroupId;
private String inlongStreamId;
- private String dataTime;
+ private long dataTime;
private String dataflowId;
private String serverType;
private String dirtyType;
+ private String dirtyMessage;
private String ext;
private String data;
private byte[] dataBytes;
public String format() {
- String now = LocalDateTime.now().format(dateTimeFormatter);
+ String reportTime = LocalDateTime.now().format(dateTimeFormatter);
StringJoiner joiner = new StringJoiner(delimiter);
String formatData = null;
if (data != null) {
@@ -50,14 +56,19 @@ public class DirtyMessageWrapper {
formatData = Base64.getEncoder().encodeToString(dataBytes);
}
- return joiner.add(inlongGroupId)
- .add(inlongStreamId)
- .add(now)
- .add(dataTime)
+ String dataTimeStr =
LocalDateTime.ofInstant(Instant.ofEpochMilli(dataTime),
+ ZoneId.systemDefault()).format(dateTimeFormatter);
+ return joiner
.add(dataflowId)
+ .add(inlongGroupId)
+ .add(inlongStreamId)
+ .add(reportTime)
+ .add(dataTimeStr)
.add(serverType)
.add(dirtyType)
- .add(ext)
- .add(formatData).toString();
+ .add(StringEscapeUtils.escapeXSI(dirtyMessage))
+ .add(StringEscapeUtils.escapeXSI(ext))
+ .add(StringEscapeUtils.escapeXSI(formatData))
+ .toString();
}
}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
similarity index 86%
rename from
inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
rename to
inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index 2240ebdb6c..88e2e88a74 100644
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -18,7 +18,6 @@
package org.apache.inlong.sdk.dirtydata;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.MessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
@@ -28,19 +27,22 @@ import com.google.common.base.Preconditions;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
+import java.net.InetAddress;
+
@Slf4j
@Builder
-public class InlongSdkDirtySink {
+public class InlongSdkDirtySender {
private String inlongGroupId;
private String inlongStreamId;
private String inlongManagerAddr;
+ private int inlongManagerPort;
private String authId;
private String authKey;
private boolean ignoreErrors;
private SendMessageCallback callback;
- private MessageSender sender;
+ private DefaultMessageSender sender;
public void init() throws Exception {
Preconditions.checkNotNull(inlongGroupId, "inlongGroupId cannot be
null");
@@ -51,8 +53,11 @@ public class InlongSdkDirtySink {
this.callback = new LogCallBack();
ProxyClientConfig proxyClientConfig =
- new ProxyClientConfig(inlongManagerAddr, inlongGroupId,
authId, authKey);
+ new
ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true,
+ inlongManagerAddr, inlongManagerPort, inlongGroupId,
authId, authKey);
+ proxyClientConfig.setReadProxyIPFromLocal(false);
this.sender =
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
+ this.sender.setMsgtype(7);
log.info("init InlongSdkDirtySink successfully, target group={},
stream={}", inlongGroupId, inlongStreamId);
}
@@ -61,6 +66,12 @@ public class InlongSdkDirtySink {
sender.asyncSendMessage(inlongGroupId, inlongStreamId,
messageWrapper.format().getBytes(), callback);
}
+ public void close() {
+ if (sender != null) {
+ sender.close();
+ }
+ }
+
class LogCallBack implements SendMessageCallback {
@Override
diff --git a/inlong-sort/sort-flink/base/pom.xml
b/inlong-sort/sort-flink/base/pom.xml
index 19c36af5fc..fd75d483d9 100644
--- a/inlong-sort/sort-flink/base/pom.xml
+++ b/inlong-sort/sort-flink/base/pom.xml
@@ -38,6 +38,11 @@
<artifactId>audit-sdk</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>dirty-data-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index c13c47f66e..7e02b0ca96 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -385,7 +385,7 @@ public final class Constants {
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FIELD_DELIMITER
=
ConfigOptions.key("dirty.side-output.field-delimiter")
.stringType()
- .defaultValue(",")
+ .defaultValue("|")
.withDescription("The field-delimiter of dirty
side-output");
public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LINE_DELIMITER =
ConfigOptions.key("dirty.side-output.line-delimiter")
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
index 542e9e1977..1ac2b60a1d 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
@@ -71,6 +71,14 @@ public class DirtyData<T> {
* The row type of data, it is only used for 'RowData'
*/
private @Nullable final LogicalType rowType;
+ /**
+ * Dirty message data time
+ */
+ private final long dataTime;
+ /**
+ * Dirty message ext params
+ */
+ private @Nullable final String extParams;
/**
* The real dirty data
*/
@@ -78,7 +86,7 @@ public class DirtyData<T> {
public DirtyData(T data, String identifier, String labels,
String logTag, DirtyType dirtyType, String dirtyMessage,
- @Nullable LogicalType rowType) {
+ @Nullable LogicalType rowType, long dataTime, String extParams) {
this.data = data;
this.dirtyType = dirtyType;
this.dirtyMessage = dirtyMessage;
@@ -87,7 +95,8 @@ public class DirtyData<T> {
this.labels = PatternReplaceUtils.replace(labels, paramMap);
this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
this.identifier = PatternReplaceUtils.replace(identifier, paramMap);
-
+ this.dataTime = dataTime == 0 ? System.currentTimeMillis() : dataTime;
+ this.extParams = extParams;
}
public static <T> Builder<T> builder() {
@@ -122,6 +131,18 @@ public class DirtyData<T> {
return identifier;
}
+ public long getDataTime() {
+ return dataTime;
+ }
+
+ public String getExtParams() {
+ return extParams;
+ }
+
+ public String getDirtyMessage() {
+ return dirtyMessage;
+ }
+
@Nullable
public LogicalType getRowType() {
return rowType;
@@ -135,8 +156,20 @@ public class DirtyData<T> {
private DirtyType dirtyType = DirtyType.UNDEFINED;
private String dirtyMessage;
private LogicalType rowType;
+ private long dataTime;
+ private String extParams;
private T data;
+ public Builder<T> setDirtyDataTime(long dataTime) {
+ this.dataTime = dataTime;
+ return this;
+ }
+
+ public Builder<T> setExtParams(String extParams) {
+ this.extParams = extParams;
+ return this;
+ }
+
public Builder<T> setDirtyType(DirtyType dirtyType) {
this.dirtyType = dirtyType;
return this;
@@ -173,7 +206,8 @@ public class DirtyData<T> {
}
public DirtyData<T> build() {
- return new DirtyData<>(data, identifier, labels, logTag,
dirtyType, dirtyMessage, rowType);
+ return new DirtyData<>(data, identifier, labels, logTag, dirtyType,
+ dirtyMessage, rowType, dataTime, extParams);
}
}
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
similarity index 91%
rename from
inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java
rename to
inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
index 0692d78580..5cb03f0f80 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
@Data
@Builder
@Getter
-public class InlongSdkOptions implements Serializable {
+public class InlongSdkDirtyOptions implements Serializable {
private static final String DEFAULT_FORMAT = "csv";
@@ -36,9 +36,10 @@ public class InlongSdkOptions implements Serializable {
private static final String DEFAULT_KV_FIELD_DELIMITER = "&";
private static final String DEFAULT_KV_ENTRY_DELIMITER = "=";
- private String inlongGroupId;
- private String inlongStreamId;
+ private String sendToGroupId;
+ private String sendToStreamId;
private String inlongManagerAddr;
+ private int inlongManagerPort;
private String inlongManagerAuthKey;
private String inlongManagerAuthId;
private String format = DEFAULT_FORMAT;
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
index 8a9d407a4b..4441cca830 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -17,13 +17,9 @@
package org.apache.inlong.sort.base.dirty.sink.sdk;
-import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
-import org.apache.inlong.sdk.dataproxy.MessageSender;
-import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
-import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
-import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sdk.dirtydata.DirtyMessageWrapper;
+import org.apache.inlong.sdk.dirtydata.InlongSdkDirtySender;
import org.apache.inlong.sort.base.dirty.DirtyData;
-import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
import org.apache.inlong.sort.base.util.LabelUtils;
@@ -37,46 +33,54 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.Map;
-import java.util.StringJoiner;
@Slf4j
public class InlongSdkDirtySink<T> implements DirtySink<T> {
- private final InlongSdkOptions options;
+ private final InlongSdkDirtyOptions options;
private final DataType physicalRowDataType;
- private final String inlongGroupId;
- private final String inlongStreamId;
- private final SendMessageCallback callback;
- private transient DateTimeFormatter dateTimeFormatter;
private transient RowData.FieldGetter[] fieldGetters;
private transient RowDataToJsonConverters.RowDataToJsonConverter converter;
- private transient MessageSender sender;
+ private transient InlongSdkDirtySender dirtySender;
- public InlongSdkDirtySink(InlongSdkOptions options, DataType
physicalRowDataType) {
+ public InlongSdkDirtySink(InlongSdkDirtyOptions options, DataType
physicalRowDataType) {
this.options = options;
this.physicalRowDataType = physicalRowDataType;
- this.inlongGroupId = options.getInlongGroupId();
- this.inlongStreamId = options.getInlongStreamId();
- this.callback = new LogCallBack();
}
@Override
public void invoke(DirtyData<T> dirtyData) throws Exception {
try {
Map<String, String> labelMap =
LabelUtils.parseLabels(dirtyData.getLabels());
- String groupId =
Preconditions.checkNotNull(labelMap.get("groupId"));
- String streamId =
Preconditions.checkNotNull(labelMap.get("streamId"));
-
- String message = join(groupId, streamId,
- dirtyData.getDirtyType(), dirtyData.getLabels(),
formatData(dirtyData, labelMap));
- sender.asyncSendMessage(inlongGroupId, inlongStreamId,
message.getBytes(), callback);
+ String dataGroupId =
Preconditions.checkNotNull(labelMap.get("groupId"));
+ String dataStreamId =
Preconditions.checkNotNull(labelMap.get("streamId"));
+ String serverType =
Preconditions.checkNotNull(labelMap.get("serverType"));
+ String dataflowId =
Preconditions.checkNotNull(labelMap.get("dataflowId"));
+
+ String dirtyMessage = formatData(dirtyData, labelMap);
+
+ DirtyMessageWrapper wrapper = DirtyMessageWrapper.builder()
+ .delimiter(options.getCsvFieldDelimiter())
+ .inlongGroupId(dataGroupId)
+ .inlongStreamId(dataStreamId)
+ .dataflowId(dataflowId)
+ .dataTime(dirtyData.getDataTime())
+ .serverType(serverType)
+ .dirtyType(dirtyData.getDirtyType().format())
+ .dirtyMessage(dirtyData.getDirtyMessage())
+ .ext(dirtyData.getExtParams())
+ .data(dirtyMessage)
+ .build();
+
+ dirtySender.sendDirtyMessage(wrapper);
} catch (Throwable t) {
log.error("failed to send dirty message to inlong sdk", t);
+ if (!options.isIgnoreSideOutputErrors()) {
+ throw new RuntimeException("failed to send dirty message to
inlong sdk", t);
+ }
}
}
@@ -84,39 +88,28 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
public void open(Configuration configuration) throws Exception {
converter =
FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType());
fieldGetters =
FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());
- dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ log.info("inlong sdk dirty options={}", options);
// init sender
- ProxyClientConfig proxyClientConfig =
- new ProxyClientConfig(options.getInlongManagerAddr(),
options.getInlongGroupId(),
- options.getInlongManagerAuthId(),
options.getInlongManagerAuthKey());
- sender =
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
+ dirtySender = InlongSdkDirtySender.builder()
+ .inlongManagerAddr(options.getInlongManagerAddr())
+ .inlongManagerPort(options.getInlongManagerPort())
+ .authId(options.getInlongManagerAuthId())
+ .authKey(options.getInlongManagerAuthKey())
+ .ignoreErrors(options.isIgnoreSideOutputErrors())
+ .inlongGroupId(options.getSendToGroupId())
+ .inlongStreamId(options.getSendToStreamId())
+ .build();
+ dirtySender.init();
}
@Override
public void close() throws Exception {
- if (sender != null) {
- sender.close();
+ if (dirtySender != null) {
+ dirtySender.close();
}
}
- private String join(
- String inlongGroup,
- String inlongStream,
- DirtyType type,
- String label,
- String formattedData) {
-
- String now = LocalDateTime.now().format(dateTimeFormatter);
-
- StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter());
- return joiner.add(inlongGroup + "." + inlongStream)
- .add(now)
- .add(type.name())
- .add(label)
- .add(formattedData).toString();
- }
-
private String formatData(DirtyData<T> dirtyData, Map<String, String>
labels) throws JsonProcessingException {
String value;
T data = dirtyData.getData();
@@ -158,28 +151,4 @@ public class InlongSdkDirtySink<T> implements DirtySink<T>
{
}
return value;
}
-
- class LogCallBack implements SendMessageCallback {
-
- @Override
- public void onMessageAck(SendResult result) {
- if (result == SendResult.OK) {
- return;
- }
- log.error("failed to send inlong dirty message, response={}",
result);
-
- if (!options.isIgnoreSideOutputErrors()) {
- throw new RuntimeException("writing dirty message to inlong
sdk failed, response=" + result);
- }
- }
-
- @Override
- public void onException(Throwable e) {
- log.error("failed to send inlong dirty message", e);
-
- if (!options.isIgnoreSideOutputErrors()) {
- throw new RuntimeException("writing dirty message to inlong
sdk failed", e);
- }
- }
- }
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
index 000836b667..8d7e399c5c 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
@@ -20,32 +20,39 @@ package org.apache.inlong.sort.base.dirty.sink.sdk;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import java.util.HashSet;
import java.util.Set;
-import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
+import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
+@Slf4j
public class InlongSdkDirtySinkFactory implements DirtySinkFactory {
private static final String IDENTIFIER = "inlong-sdk";
- private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_MANAGER
=
+ private static final ConfigOption<String>
DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR =
ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-manager-addr")
.stringType()
.noDefaultValue()
.withDescription("The inlong manager addr to init inlong
sdk");
+ private static final ConfigOption<Integer>
DIRTY_SIDE_OUTPUT_INLONG_MANAGER_PORT =
+
ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-manager-port")
+ .intType()
+ .defaultValue(8083)
+ .withDescription("The inlong manager port to init inlong
sdk");
+
private static final ConfigOption<String> DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID
=
ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-auth-id")
.stringType()
@@ -74,24 +81,18 @@ public class InlongSdkDirtySinkFactory implements
DirtySinkFactory {
public <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context
context) {
ReadableConfig config =
Configuration.fromMap(context.getCatalogTable().getOptions());
FactoryUtil.validateFactoryOptions(this, config);
- validate(config);
- return new InlongSdkDirtySink<>(getOptions(config),
+ InlongSdkDirtyOptions options = getOptions(config);
+ return new InlongSdkDirtySink<>(options,
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
}
- private void validate(ReadableConfig config) {
- String identifier = config.getOptional(DIRTY_IDENTIFIER).orElse(null);
- if (identifier == null || identifier.trim().isEmpty()) {
- throw new ValidationException(
- "The option 'dirty.identifier' is not allowed to be
empty.");
- }
- }
-
- private InlongSdkOptions getOptions(ReadableConfig config) {
- return InlongSdkOptions.builder()
-
.inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER))
- .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
- .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
+ private InlongSdkDirtyOptions getOptions(ReadableConfig config) {
+ return InlongSdkDirtyOptions.builder()
+
.inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR))
+
.inlongManagerPort(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_PORT))
+ .sendToGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
+ .sendToStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
+
.csvFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER))
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
.ignoreSideOutputErrors(config.getOptional(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS).orElse(true))
@@ -107,7 +108,7 @@ public class InlongSdkDirtySinkFactory implements
DirtySinkFactory {
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
- options.add(DIRTY_SIDE_OUTPUT_INLONG_MANAGER);
+ options.add(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR);
options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID);
options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY);
options.add(DIRTY_SIDE_OUTPUT_INLONG_GROUP);
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
index f863d1cc30..b066347baa 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml
@@ -62,6 +62,7 @@
<groupId>org.apache.inlong</groupId>
<artifactId>sort-common</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
</dependencies>
@@ -86,6 +87,10 @@
<includes>
<include>org.apache.inlong:*</include>
<include>com.fasterxml.*:*</include>
+ <include>com.google.protobuf:*</include>
+ <include>org.apache.commons:*</include>
+ <include>commons-collections:*</include>
+ <include>commons-codec:*</include>
</includes>
</artifactSet>
@@ -111,6 +116,10 @@
<pattern>org.apache.inlong.sort.base</pattern>
<shadedPattern>org.apache.inlong.sort.tubemq.shaded.org.apache.inlong.sort.base</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+
<shadedPattern>org.apache.inlong.sort.tubemq.shaded.org.apache.commons</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
index 80532a2dc1..822396c9cb 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -29,7 +29,7 @@ import java.io.Serializable;
public interface DynamicTubeMQDeserializationSchema<T> extends Serializable,
ResultTypeQueryable<T> {
@PublicEvolving
- default void open() {
+ default void open() throws Exception {
}
/**
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
index 5d4a3bd2c6..a178aa57a8 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
@@ -17,26 +17,36 @@
package org.apache.inlong.sort.tubemq.table;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
import org.apache.inlong.tubemq.corebase.Message;
import com.google.common.base.Objects;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+@Slf4j
public class DynamicTubeMQTableDeserializationSchema implements
DynamicTubeMQDeserializationSchema<RowData> {
/**
@@ -65,26 +75,37 @@ public class DynamicTubeMQTableDeserializationSchema
implements DynamicTubeMQDes
private final MetricOption metricOption;
+ private final DirtySink<byte[]> dirtySink;
+
+ private final DirtyOptions dirtyOptions;
+
public DynamicTubeMQTableDeserializationSchema(
DeserializationSchema<RowData> schema,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean ignoreErrors,
boolean innerFormat,
- MetricOption metricOption) {
+ MetricOption metricOption,
+ DirtySink<byte[]> dirtySink,
+ DirtyOptions dirtyOptions) {
this.deserializationSchema = schema;
this.metadataConverters = metadataConverters;
this.producedTypeInfo = producedTypeInfo;
this.ignoreErrors = ignoreErrors;
this.innerFormat = innerFormat;
this.metricOption = metricOption;
+ this.dirtySink = dirtySink;
+ this.dirtyOptions = dirtyOptions;
}
@Override
- public void open() {
+ public void open() throws Exception {
if (metricOption != null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption);
}
+ if (dirtySink != null) {
+ dirtySink.open(new Configuration());
+ }
}
@Override
@@ -103,7 +124,41 @@ public class DynamicTubeMQTableDeserializationSchema
implements DynamicTubeMQDes
if (!innerFormat) {
metricsCollector.resetTimestamp(System.currentTimeMillis());
}
- deserializationSchema.deserialize(message.getData(), metricsCollector);
+
+ if (!dirtyOptions.ignoreDirty()) {
+ deserializationSchema.deserialize(message.getData(),
metricsCollector);
+ } else {
+ try {
+ deserializationSchema.deserialize(message.getData(),
metricsCollector);
+ } catch (Throwable t) {
+ if (dirtySink != null) {
+ DirtyData.Builder<byte[]> builder = DirtyData.builder();
+ try {
+
+ DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyyMMddHHmm");
+ long dataTime =
LocalDateTime.parse(message.getMsgTime(), formatter)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+
+ builder.setData(message.getData())
+ .setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR)
+ .setDirtyDataTime(dataTime)
+ .setExtParams(message.getAttribute())
+ .setLabels(dirtyOptions.getLabels())
+ .setLogTag(dirtyOptions.getLogTag())
+ .setDirtyMessage(t.getMessage())
+ .setIdentifier(dirtyOptions.getIdentifier());
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new IOException(ex);
+ }
+ log.warn("Dirty sink failed", ex);
+ }
+ }
+ }
+ }
rows.forEach(row -> emitRow(message, (GenericRowData) row, out));
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
index 4962364a9c..22c3306064 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java
@@ -17,6 +17,9 @@
package org.apache.inlong.sort.tubemq.table;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -48,22 +51,13 @@ import java.util.Set;
import java.util.TreeSet;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
-import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
-import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-import static
org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.MASTER_RPC;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.SESSION_KEY;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.STREAMID;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC;
-import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.TOPIC_PATTERN;
-import static
org.apache.inlong.sort.tubemq.table.TubeMQOptions.getTubeMQProperties;
+import static org.apache.inlong.sort.base.Constants.*;
+import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.*;
/**
* A dynamic table factory implementation for TubeMQ.
*/
+
public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
public static final String IDENTIFIER = "tubemq-inlong";
@@ -120,10 +114,10 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat = getValueDecodingFormat(helper);
// validate all options
- helper.validateExcept(ExtractNode.INLONG_MSG);
+ helper.validateExcept(ExtractNode.INLONG_MSG, PROPERTIES_PREFIX,
DIRTY_PREFIX);
validatePKConstraints(context.getObjectIdentifier(),
context.getCatalogTable(), valueDecodingFormat);
- innerFormat = ExtractNode.INLONG_MSG.equals(tableOptions.get(FORMAT));
+ innerFormat =
tableOptions.get(FORMAT).contains(ExtractNode.INLONG_MSG);
final Configuration properties =
getTubeMQProperties(context.getCatalogTable().getOptions());
@@ -133,6 +127,9 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
String auditHostAndPorts = tableOptions.get(INLONG_AUDIT);
String auditKeys = tableOptions.get(AUDIT_KEYS);
+ final DirtyOptions dirtyOptions =
DirtyOptions.fromConfig(tableOptions);
+ final DirtySink<byte[]> dirtySink =
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
+
return createTubeMQTableSource(
physicalDataType,
valueDecodingFormat,
@@ -144,7 +141,9 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
properties,
inlongMetric,
auditHostAndPorts,
- auditKeys);
+ auditKeys,
+ dirtySink,
+ dirtyOptions);
}
@Override
@@ -184,7 +183,9 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
Configuration properties,
String inlongMetric,
String auditHostAndPorts,
- String auditKeys) {
+ String auditKeys,
+ DirtySink<byte[]> dirtySink,
+ DirtyOptions dirtyOptions) {
return new TubeMQTableSource(
physicalDataType,
valueDecodingFormat,
@@ -200,7 +201,9 @@ public class TubeMQDynamicTableFactory implements
DynamicTableSourceFactory, Dyn
innerFormat,
inlongMetric,
auditHostAndPorts,
- auditKeys);
+ auditKeys,
+ dirtySink,
+ dirtyOptions);
}
protected TubeMQTableSink createTubeMQTableSink(
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
index 2d5ddbb3d5..d69fcf985e 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.tubemq.table;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer;
@@ -131,6 +133,9 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
private String auditHostAndPorts;
private String auditKeys;
+ private DirtySink<byte[]> dirtySink;
+ private DirtyOptions dirtyOptions;
+
/**
* Watermark strategy that is used to generate per-partition watermark.
*/
@@ -143,7 +148,8 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
TreeSet<String> streamIdSet, String consumerGroup, String
sessionKey,
Configuration configuration, @Nullable WatermarkStrategy<RowData>
watermarkStrategy,
Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean
innerFormat,
- String inlongMetric, String auditHostAndPorts, String auditKeys) {
+ String inlongMetric, String auditHostAndPorts, String auditKeys,
+ DirtySink<byte[]> dirtySink, DirtyOptions dirtyOptions) {
Preconditions.checkNotNull(physicalDataType, "Physical data type must
not be null.");
Preconditions.checkNotNull(valueDecodingFormat, "The deserialization
schema must not be null.");
@@ -170,6 +176,8 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
this.auditKeys = auditKeys;
+ this.dirtySink = dirtySink;
+ this.dirtyOptions = dirtyOptions;
}
@Override
@@ -200,7 +208,7 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
physicalDataType, valueDecodingFormat, masterAddress,
topic, streamIdSet, consumerGroup, sessionKey, configuration,
watermarkStrategy, proctimeAttribute, ignoreErrors,
innerFormat,
- inlongMetric, auditHostAndPorts, auditKeys);
+ inlongMetric, auditHostAndPorts, auditKeys, dirtySink,
dirtyOptions);
}
@Override
@@ -325,7 +333,8 @@ public class TubeMQTableSource implements ScanTableSource,
SupportsReadingMetada
final DynamicTubeMQDeserializationSchema<RowData> tubeMQDeserializer =
new DynamicTubeMQTableDeserializationSchema(
- deserialization, metadataConverters, producedTypeInfo,
ignoreErrors, innerFormat, metricOption);
+ deserialization, metadataConverters, producedTypeInfo,
ignoreErrors,
+ innerFormat, metricOption, dirtySink, dirtyOptions);
final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new
FlinkTubeMQConsumer(masterAddress, topic, streamIdSet,
consumerGroup, tubeMQDeserializer, configuration, sessionKey,
innerFormat);