This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f493862b58 [INLONG-11493][SDK] Inlong SDK Dirty Sink supports retry
sending (#11498)
f493862b58 is described below
commit f493862b5825dffe297376d94538908400158a63
Author: vernedeng <[email protected]>
AuthorDate: Thu Nov 14 12:19:59 2024 +0800
[INLONG-11493][SDK] Inlong SDK Dirty Sink supports retry sending (#11498)
---
.../inlong/sdk/dirtydata/DirtyMessageWrapper.java | 8 +++
.../inlong/sdk/dirtydata/InlongSdkDirtySender.java | 71 +++++++++++++++++-----
.../apache/inlong/sort/base/dirty/DirtyData.java | 18 +++++-
.../sort/base/dirty/sink/DirtyServerType.java | 37 +++++++++++
.../base/dirty/sink/sdk/InlongSdkDirtyOptions.java | 2 +
.../base/dirty/sink/sdk/InlongSdkDirtySink.java | 5 +-
.../dirty/sink/sdk/InlongSdkDirtySinkFactory.java | 13 +++-
.../DynamicTubeMQTableDeserializationSchema.java | 2 +
8 files changed, 136 insertions(+), 20 deletions(-)
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
index 977e002478..a82d574cac 100644
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sdk.dirtydata;
import lombok.Builder;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.text.StringEscapeUtils;
@@ -34,6 +35,9 @@ public class DirtyMessageWrapper {
private static DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private String delimiter;
+ @Builder.Default
+ @Getter
+ private int retryTimes = 0;
private String inlongGroupId;
private String inlongStreamId;
@@ -71,4 +75,8 @@ public class DirtyMessageWrapper {
.add(StringEscapeUtils.escapeXSI(formatData))
.toString();
}
+
+ public void increaseRetry() {
+ retryTimes++;
+ }
}
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
index 88e2e88a74..80cc596c26 100644
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySender.java
@@ -21,13 +21,17 @@ import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
-import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import com.google.common.base.Preconditions;
import lombok.Builder;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.net.InetAddress;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
@Slf4j
@Builder
@@ -40,9 +44,14 @@ public class InlongSdkDirtySender {
private String authId;
private String authKey;
private boolean ignoreErrors;
+ private int maxRetryTimes;
+ private int maxCallbackSize;
+ @Builder.Default
+ private boolean closed = false;
- private SendMessageCallback callback;
+ private LinkedBlockingQueue<DirtyMessageWrapper> dirtyDataQueue;
private DefaultMessageSender sender;
+ private Executor executor;
public void init() throws Exception {
Preconditions.checkNotNull(inlongGroupId, "inlongGroupId cannot be
null");
@@ -51,45 +60,79 @@ public class InlongSdkDirtySender {
Preconditions.checkNotNull(authId, "authId cannot be null");
Preconditions.checkNotNull(authKey, "authKey cannot be null");
- this.callback = new LogCallBack();
ProxyClientConfig proxyClientConfig =
new
ProxyClientConfig(InetAddress.getLocalHost().getHostAddress(), true,
inlongManagerAddr, inlongManagerPort, inlongGroupId,
authId, authKey);
proxyClientConfig.setReadProxyIPFromLocal(false);
+ proxyClientConfig.setAsyncCallbackSize(maxCallbackSize);
this.sender =
DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
this.sender.setMsgtype(7);
+
+ this.dirtyDataQueue = new LinkedBlockingQueue<>(maxCallbackSize);
+ this.executor = Executors.newSingleThreadExecutor();
+ executor.execute(this::doSendDirtyMessage);
log.info("init InlongSdkDirtySink successfully, target group={},
stream={}", inlongGroupId, inlongStreamId);
}
- public void sendDirtyMessage(DirtyMessageWrapper messageWrapper)
- throws ProxysdkException {
- sender.asyncSendMessage(inlongGroupId, inlongStreamId,
messageWrapper.format().getBytes(), callback);
+ public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) throws
InterruptedException {
+ dirtyDataQueue.offer(messageWrapper, 10, TimeUnit.SECONDS);
+ }
+
+ private void doSendDirtyMessage() {
+ while (!closed) {
+ try {
+ DirtyMessageWrapper messageWrapper = dirtyDataQueue.poll();
+ if (messageWrapper == null) {
+ Thread.sleep(100L);
+ continue;
+ }
+ messageWrapper.increaseRetry();
+ if (messageWrapper.getRetryTimes() > maxRetryTimes) {
+ log.error("failed to send dirty message after {} times,
dirty data ={}", maxRetryTimes,
+ messageWrapper);
+ continue;
+ }
+
+ sender.asyncSendMessage(inlongGroupId, inlongStreamId,
+ messageWrapper.format().getBytes(), new
LogCallBack(messageWrapper));
+
+ } catch (Throwable t) {
+ log.error("failed to send inlong dirty message", t);
+ if (!ignoreErrors) {
+ throw new RuntimeException("writing dirty message to
inlong sdk failed", t);
+ }
+ }
+
+ }
}
public void close() {
+ closed = true;
+ dirtyDataQueue.clear();
if (sender != null) {
sender.close();
}
}
+ @Getter
class LogCallBack implements SendMessageCallback {
+ private final DirtyMessageWrapper wrapper;
+
+ public LogCallBack(DirtyMessageWrapper wrapper) {
+ this.wrapper = wrapper;
+ }
+
@Override
public void onMessageAck(SendResult result) {
- if (result == SendResult.OK) {
- return;
- }
- log.error("failed to send inlong dirty message, response={}",
result);
-
- if (!ignoreErrors) {
- throw new RuntimeException("writing dirty message to inlong
sdk failed, response=" + result);
+ if (SendResult.OK != result) {
+ dirtyDataQueue.offer(wrapper);
}
}
@Override
public void onException(Throwable e) {
log.error("failed to send inlong dirty message", e);
-
if (!ignoreErrors) {
throw new RuntimeException("writing dirty message to inlong
sdk failed", e);
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
index 1ac2b60a1d..24c5dddecd 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.base.dirty;
+import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
import org.apache.inlong.sort.base.util.PatternReplaceUtils;
import org.apache.flink.table.types.logical.LogicalType;
@@ -63,6 +64,8 @@ public class DirtyData<T> {
* Dirty type
*/
private final DirtyType dirtyType;
+
+ private final DirtyServerType serverType;
/**
* Dirty describe message, it is the cause of dirty data
*/
@@ -85,10 +88,11 @@ public class DirtyData<T> {
private final T data;
public DirtyData(T data, String identifier, String labels,
- String logTag, DirtyType dirtyType, String dirtyMessage,
+ String logTag, DirtyType dirtyType, DirtyServerType serverType,
String dirtyMessage,
@Nullable LogicalType rowType, long dataTime, String extParams) {
this.data = data;
this.dirtyType = dirtyType;
+ this.serverType = serverType;
this.dirtyMessage = dirtyMessage;
this.rowType = rowType;
Map<String, String> paramMap = genParamMap();
@@ -127,6 +131,10 @@ public class DirtyData<T> {
return dirtyType;
}
+ public DirtyServerType getServerType() {
+ return serverType;
+ }
+
public String getIdentifier() {
return identifier;
}
@@ -154,6 +162,7 @@ public class DirtyData<T> {
private String labels;
private String logTag;
private DirtyType dirtyType = DirtyType.UNDEFINED;
+ private DirtyServerType serverType = DirtyServerType.UNDEFINED;
private String dirtyMessage;
private LogicalType rowType;
private long dataTime;
@@ -175,6 +184,11 @@ public class DirtyData<T> {
return this;
}
+ public Builder<T> setServerType(DirtyServerType serverType) {
+ this.serverType = serverType;
+ return this;
+ }
+
public Builder<T> setLabels(String labels) {
this.labels = labels;
return this;
@@ -206,7 +220,7 @@ public class DirtyData<T> {
}
public DirtyData<T> build() {
- return new DirtyData<>(data, identifier, labels, logTag, dirtyType,
+ return new DirtyData<>(data, identifier, labels, logTag,
dirtyType, serverType,
dirtyMessage, rowType, dataTime, extParams);
}
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
new file mode 100644
index 0000000000..63f993c146
--- /dev/null
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink;
+
+public enum DirtyServerType {
+
+ UNDEFINED("Undefined"),
+ TUBE_MQ("TubeMQ"),
+ ICEBERG("Iceberg")
+
+ ;
+
+ private final String format;
+
+ DirtyServerType(String format) {
+ this.format = format;
+ }
+
+ public String format() {
+ return format;
+ }
+}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
index 5cb03f0f80..84581dd4ce 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtyOptions.java
@@ -49,4 +49,6 @@ public class InlongSdkDirtyOptions implements Serializable {
private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER;
private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER;
private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER;
+ private int retryTimes;
+ private int maxCallbackSize;
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
index 4441cca830..8513f841bc 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -57,7 +57,6 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
Map<String, String> labelMap =
LabelUtils.parseLabels(dirtyData.getLabels());
String dataGroupId =
Preconditions.checkNotNull(labelMap.get("groupId"));
String dataStreamId =
Preconditions.checkNotNull(labelMap.get("streamId"));
- String serverType =
Preconditions.checkNotNull(labelMap.get("serverType"));
String dataflowId =
Preconditions.checkNotNull(labelMap.get("dataflowId"));
String dirtyMessage = formatData(dirtyData, labelMap);
@@ -68,7 +67,7 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
.inlongStreamId(dataStreamId)
.dataflowId(dataflowId)
.dataTime(dirtyData.getDataTime())
- .serverType(serverType)
+ .serverType(dirtyData.getServerType().format())
.dirtyType(dirtyData.getDirtyType().format())
.dirtyMessage(dirtyData.getDirtyMessage())
.ext(dirtyData.getExtParams())
@@ -99,6 +98,8 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
.ignoreErrors(options.isIgnoreSideOutputErrors())
.inlongGroupId(options.getSendToGroupId())
.inlongStreamId(options.getSendToStreamId())
+ .maxRetryTimes(options.getRetryTimes())
+ .maxCallbackSize(options.getMaxCallbackSize())
.build();
dirtySender.init();
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
index 8d7e399c5c..053aa58364 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
@@ -35,6 +35,7 @@ import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELI
import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
import static
org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_RETRIES;
@Slf4j
public class InlongSdkDirtySinkFactory implements DirtySinkFactory {
@@ -77,6 +78,12 @@ public class InlongSdkDirtySinkFactory implements
DirtySinkFactory {
.noDefaultValue()
.withDescription("The inlong stream id of dirty sink");
+ private static final ConfigOption<Integer>
DIRTY_SIDE_OUTPUT_MAX_CALLBACK_SIZE =
+ ConfigOptions.key("dirty.side-output.inlong-sdk.max-callback-size")
+ .intType()
+ .defaultValue(100000)
+ .withDescription("The inlong stream id of dirty sink");
+
@Override
public <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context
context) {
ReadableConfig config =
Configuration.fromMap(context.getCatalogTable().getOptions());
@@ -95,8 +102,10 @@ public class InlongSdkDirtySinkFactory implements
DirtySinkFactory {
.csvFieldDelimiter(config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER))
.inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
.inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
-
.ignoreSideOutputErrors(config.getOptional(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS).orElse(true))
- .enableDirtyLog(true)
+
.ignoreSideOutputErrors(config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS))
+ .retryTimes(config.get(DIRTY_SIDE_OUTPUT_RETRIES))
+
.maxCallbackSize(config.get(DIRTY_SIDE_OUTPUT_MAX_CALLBACK_SIZE))
+ .enableDirtyLog(config.get(DIRTY_SIDE_OUTPUT_LOG_ENABLE))
.build();
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
index a178aa57a8..94631e7cd3 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.tubemq.table;
import org.apache.inlong.sort.base.dirty.DirtyData;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
@@ -143,6 +144,7 @@ public class DynamicTubeMQTableDeserializationSchema
implements DynamicTubeMQDes
builder.setData(message.getData())
.setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR)
+ .setServerType(DirtyServerType.TUBE_MQ)
.setDirtyDataTime(dataTime)
.setExtParams(message.getAttribute())
.setLabels(dirtyOptions.getLabels())