This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 73c0b1d148 [INLONG-8267][DataProxy] Add the control of whether to
retry and the count of retries for the failure message (#8271)
73c0b1d148 is described below
commit 73c0b1d1488ffb9520c2661a867b796bf5d6c444
Author: Goson Zhang <[email protected]>
AuthorDate: Sun Jun 18 19:10:11 2023 +0800
[INLONG-8267][DataProxy] Add the control of whether to retry and the count
of retries for the failure message (#8271)
---
.../dataproxy/config/CommonConfigHolder.java | 58 +++++++++++++++------
.../inlong/dataproxy/consts/StatConstants.java | 1 +
.../inlong/dataproxy/sink/common/TubeUtils.java | 3 +-
.../inlong/dataproxy/sink/mq/BatchPackManager.java | 21 ++++----
.../inlong/dataproxy/sink/mq/BatchPackProfile.java | 23 +++++----
.../sink/mq/MessageQueueZoneSinkContext.java | 43 ++++++++++++++++
.../inlong/dataproxy/sink/mq/PackProfile.java | 32 +++++++-----
.../dataproxy/sink/mq/SimplePackProfile.java | 18 ++++---
.../dataproxy/sink/mq/kafka/KafkaHandler.java | 25 +++++----
.../dataproxy/sink/mq/pulsar/PulsarHandler.java | 23 +++++----
.../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 59 +++++++++++++++-------
.../dataproxy/source2/InLongMessageHandler.java | 2 +-
.../source2/httpMsg/InLongHttpMsgHandler.java | 4 +-
.../dataproxy/source2/v0msg/CodecBinMsg.java | 4 +-
.../config/holder/TestCommonConfigHolder.java | 3 ++
.../src/test/resources/common.properties | 4 ++
16 files changed, 220 insertions(+), 103 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
index d6e246978a..be8abdc80b 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/CommonConfigHolder.java
@@ -119,7 +119,12 @@ public class CommonConfigHolder {
public static final int VAL_DEF_AUDIT_MAX_CACHE_ROWS = 2000000;
public static final String KEY_AUDIT_FORMAT_INTERVAL_MS =
"auditFormatInterval";
public static final long VAL_DEF_AUDIT_FORMAT_INTERVAL_MS = 60000L;
- // Whether response after save msg
+ // whether to retry after the message send failure
+ public static final String KEY_ENABLE_SEND_RETRY_AFTER_FAILURE =
"send.retry.after.failure";
+ public static final boolean VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE = true;
+ // max retry count
+ public static final String KEY_MAX_RETRIES_AFTER_FAILURE =
"max.retries.after.failure";
+ public static final int VAL_DEF_MAX_RETRIES_AFTER_FAILURE = -1;
public static final String KEY_RESPONSE_AFTER_SAVE = "isResponseAfterSave";
public static final boolean VAL_DEF_RESPONSE_AFTER_SAVE = false;
// Same as KEY_MAX_RESPONSE_TIMEOUT_MS = "maxResponseTimeoutMs";
@@ -182,6 +187,8 @@ public class CommonConfigHolder {
private String fileMetricSourceOutName =
VAL_DEF_FILE_METRIC_SOURCE_OUTPUT_NAME;
private String fileMetricSinkOutName =
VAL_DEF_FILE_METRIC_SINK_OUTPUT_NAME;
private String fileMetricEventOutName =
VAL_DEF_FILE_METRIC_EVENT_OUTPUT_NAME;
+ private boolean enableSendRetryAfterFailure =
VAL_DEF_ENABLE_SEND_RETRY_AFTER_FAILURE;
+ private int maxRetriesAfterFailure = VAL_DEF_MAX_RETRIES_AFTER_FAILURE;
/**
* get instance for common.properties config manager
@@ -362,6 +369,14 @@ public class CommonConfigHolder {
return fileMetricEventOutName;
}
+ public boolean isEnableSendRetryAfterFailure() {
+ return enableSendRetryAfterFailure;
+ }
+
+ public int getMaxRetriesAfterFailure() {
+ return maxRetriesAfterFailure;
+ }
+
private void preReadFields() {
String tmpValue;
// read cluster tag
@@ -403,22 +418,20 @@ public class CommonConfigHolder {
tmpValue = this.props.get(KEY_ENABLE_UNCONFIGURED_TOPIC_ACCEPT);
if (StringUtils.isNotEmpty(tmpValue)) {
this.enableUnConfigTopicAccept =
"TRUE".equalsIgnoreCase(tmpValue.trim());
- if (enableUnConfigTopicAccept) {
- // read default topics
- tmpValue =
this.props.get(KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS);
- if (StringUtils.isNotBlank(tmpValue)) {
- List<String> tmpList = new ArrayList<>();
- String[] topicItems = tmpValue.split("\\s+");
- for (String item : topicItems) {
- if (StringUtils.isBlank(item)) {
- continue;
- }
- tmpList.add(item.trim());
- }
- if (tmpList.size() > 0) {
- defaultTopics = tmpList;
- }
+ }
+ // read default topics
+ tmpValue = this.props.get(KEY_UNCONFIGURED_TOPIC_DEFAULT_TOPICS);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ List<String> tmpList = new ArrayList<>();
+ String[] topicItems = tmpValue.split("\\s+");
+ for (String item : topicItems) {
+ if (StringUtils.isBlank(item)) {
+ continue;
}
+ tmpList.add(item.trim());
+ }
+ if (tmpList.size() > 0) {
+ defaultTopics = tmpList;
}
}
// read enable whitelist
@@ -548,6 +561,19 @@ public class CommonConfigHolder {
if (StringUtils.isNotEmpty(tmpValue)) {
this.prometheusHttpPort = NumberUtils.toInt(tmpValue.trim(),
VAL_DEF_PROMETHEUS_HTTP_PORT);
}
+ // read whether retry send message after sent failure
+ tmpValue = this.props.get(KEY_ENABLE_SEND_RETRY_AFTER_FAILURE);
+ if (StringUtils.isNotEmpty(tmpValue)) {
+ this.enableSendRetryAfterFailure =
"TRUE".equalsIgnoreCase(tmpValue.trim());
+ }
+ // read max retry count
+ tmpValue = this.props.get(KEY_MAX_RETRIES_AFTER_FAILURE);
+ if (StringUtils.isNotBlank(tmpValue)) {
+ int retries = NumberUtils.toInt(tmpValue.trim(),
VAL_DEF_MAX_RETRIES_AFTER_FAILURE);
+ if (retries >= 0) {
+ this.maxRetriesAfterFailure = retries;
+ }
+ }
// initial ip parser
try {
Class<? extends IManagerIpListParser> ipListParserClass =
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
index 973f01451f..45126cc55b 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/StatConstants.java
@@ -87,6 +87,7 @@ public class StatConstants {
public static final java.lang.String EVENT_SINK_FAILRETRY = "sink.retry";
public static final java.lang.String EVENT_SINK_FAILDROPPED =
"sink.dropped";
public static final java.lang.String EVENT_SINK_SUCCESS = "sink.success";
+ public static final java.lang.String EVENT_SINK_FAILURE = "sink.failure";
public static final java.lang.String EVENT_SINK_RECEIVEEXCEPT =
"sink.rcvexcept";
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
index 5542e8026f..ca4bcceed4 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
@@ -67,9 +67,8 @@ public class TubeUtils {
message.putSystemHeader(headers.get(Constants.INLONG_STREAM_ID),
DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
} else {
- long dataTimeL =
Long.parseLong(headers.get(AttributeConstants.DATA_TIME));
message.putSystemHeader(headers.get(AttributeConstants.STREAM_ID),
- DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+ headers.get(ConfigConstants.PKG_TIME_KEY));
}
Map<String, String> extraAttrMap = MessageUtils.getXfsAttrs(headers,
pkgVersion);
for (Map.Entry<String, String> entry : extraAttrMap.entrySet()) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
index 84a1f5c976..9daebf5c90 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
@@ -54,18 +54,18 @@ public class BatchPackManager {
private final long dispatchTimeout;
private final long maxPackCount;
private final long maxPackSize;
- private BufferQueue<PackProfile> dispatchQueue;
- private ConcurrentHashMap<String, PackProfile> profileCache = new
ConcurrentHashMap<>();
+ private final BufferQueue<PackProfile> dispatchQueue;
+ private final ConcurrentHashMap<String, PackProfile> profileCache = new
ConcurrentHashMap<>();
// flag that manager need to output overtime data.
- private AtomicBoolean needOutputOvertimeData = new AtomicBoolean(false);
- private AtomicLong inCounter = new AtomicLong(0);
- private AtomicLong outCounter = new AtomicLong(0);
+ private final AtomicBoolean needOutputOvertimeData = new
AtomicBoolean(false);
+ private final AtomicLong inCounter = new AtomicLong(0);
+ private final AtomicLong outCounter = new AtomicLong(0);
/**
* Constructor
*
- * @param context
- * @param dispatchQueue
+ * @param context the process context
+ * @param dispatchQueue the batch queue
*/
public BatchPackManager(Context context, BufferQueue<PackProfile>
dispatchQueue) {
this.dispatchQueue = dispatchQueue;
@@ -76,7 +76,7 @@ public class BatchPackManager {
/**
* addEvent
- * @param event
+ * @param event the event to add
*/
public void addEvent(ProxyEvent event) {
// parse
@@ -106,7 +106,7 @@ public class BatchPackManager {
/**
* addPackEvent
- * @param packEvent
+ * @param packEvent the event need to add
*/
public void addPackEvent(ProxyPackEvent packEvent) {
String eventUid = packEvent.getUid();
@@ -142,7 +142,7 @@ public class BatchPackManager {
/**
* addSimpleEvent
- * @param event
+ * @param event the event need to add
*/
public void addSimpleEvent(SimpleEvent event) {
Map<String, String> headers = event.getHeaders();
@@ -162,7 +162,6 @@ public class BatchPackManager {
/**
* outputOvertimeData
*
- * @return
*/
public void outputOvertimeData() {
if (!needOutputOvertimeData.getAndSet(false)) {
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfile.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfile.java
index 479b26ee88..99cc05e9e5 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfile.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackProfile.java
@@ -37,10 +37,10 @@ public class BatchPackProfile extends PackProfile {
/**
* Constructor
*
- * @param uid
- * @param inlongGroupId
- * @param inlongStreamId
- * @param dispatchTime
+ * @param uid the inlong id
+ * @param inlongGroupId the group id
+ * @param inlongStreamId the stream id
+ * @param dispatchTime the dispatch time
*/
public BatchPackProfile(String uid, String inlongGroupId, String
inlongStreamId, long dispatchTime) {
super(uid, inlongGroupId, inlongStreamId, dispatchTime);
@@ -49,10 +49,10 @@ public class BatchPackProfile extends PackProfile {
/**
* addEvent
*
- * @param event
- * @param maxPackCount
- * @param maxPackSize
- * @return
+ * @param event the event to added
+ * @param maxPackCount the max package count to cached
+ * @param maxPackSize the max package size to cached
+ * @return whether added the event
*/
public boolean addEvent(Event event, long maxPackCount, long maxPackSize) {
long eventLength = event.getBody().length;
@@ -94,7 +94,6 @@ public class BatchPackProfile extends PackProfile {
/**
* fail
- * @return
*/
public void fail(DataProxyErrCode errCode, String errMsg) {
if (callback != null) {
@@ -104,10 +103,12 @@ public class BatchPackProfile extends PackProfile {
/**
* isResend
- * @return
+ * @return whether resend message
*/
public boolean isResend() {
- return callback == null;
+ return callback == null
+ && enableRetryAfterFailure
+ && (maxRetries < 0 || ++retries <= maxRetries);
}
/**
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index c395af814d..15f0879ad0 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -18,7 +18,10 @@
package org.apache.inlong.dataproxy.sink.mq;
import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
+import org.apache.inlong.dataproxy.consts.AttrConstants;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
@@ -28,6 +31,7 @@ import
org.apache.inlong.sdk.commons.protocol.ProxySdk.INLONG_COMPRESSED_TYPE;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.conf.Configurable;
@@ -272,4 +276,43 @@ public class MessageQueueZoneSinkContext extends
SinkContext {
}
return null;
}
+
+ public void fileMetricAddSuccCnt(PackProfile packProfile, String topic,
String remoteId) {
+ if (!CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ return;
+ }
+ if (packProfile instanceof SimplePackProfile) {
+ SimplePackProfile simpleProfile = (SimplePackProfile) packProfile;
+ StringBuilder statsKey = new StringBuilder(512)
+ .append(sinkName).append(AttrConstants.SEPARATOR)
+
.append(simpleProfile.getInlongGroupId()).append(AttrConstants.SEPARATOR)
+
.append(simpleProfile.getInlongStreamId()).append(AttrConstants.SEPARATOR)
+ .append(topic).append(AttrConstants.SEPARATOR)
+
.append(NetworkUtils.getLocalIp()).append(AttrConstants.SEPARATOR)
+ .append(remoteId).append(AttrConstants.SEPARATOR)
+
.append(simpleProfile.getProperties().get(ConfigConstants.PKG_TIME_KEY));
+ monitorIndex.addSuccStats(statsKey.toString(), NumberUtils.toInt(
+
simpleProfile.getProperties().get(ConfigConstants.MSG_COUNTER_KEY), 1),
+ 1, simpleProfile.getSize());
+ }
+ }
+
+ public void fileMetricAddFailCnt(PackProfile packProfile, String topic,
String remoteId) {
+ if (!CommonConfigHolder.getInstance().isEnableFileMetric()) {
+ return;
+ }
+
+ if (packProfile instanceof SimplePackProfile) {
+ SimplePackProfile simpleProfile = (SimplePackProfile) packProfile;
+ StringBuilder statsKey = new StringBuilder(512)
+ .append(sinkName).append(AttrConstants.SEPARATOR)
+
.append(simpleProfile.getInlongGroupId()).append(AttrConstants.SEPARATOR)
+
.append(simpleProfile.getInlongStreamId()).append(AttrConstants.SEPARATOR)
+ .append(topic).append(AttrConstants.SEPARATOR)
+
.append(NetworkUtils.getLocalIp()).append(AttrConstants.SEPARATOR)
+ .append(remoteId).append(AttrConstants.SEPARATOR)
+
.append(simpleProfile.getProperties().get(ConfigConstants.PKG_TIME_KEY));
+ monitorIndex.addFailStats(statsKey.toString(), 1);
+ }
+ }
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
index 9541091347..1867e6a8ce 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/PackProfile.java
@@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.sink.mq;
import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.flume.Event;
@@ -34,20 +35,24 @@ public abstract class PackProfile {
private final String uid;
protected long count = 0;
protected long size = 0;
-
+ protected final boolean enableRetryAfterFailure;
+ protected final int maxRetries;
+ protected int retries = 0;
/**
* Constructor
*
- * @param uid
- * @param inlongGroupId
- * @param inlongStreamId
- * @param dispatchTime
+ * @param uid the inlong id
+ * @param inlongGroupId the group id
+ * @param inlongStreamId the stream id
+ * @param dispatchTime the dispatch time
*/
public PackProfile(String uid, String inlongGroupId, String
inlongStreamId, long dispatchTime) {
this.uid = uid;
this.inlongGroupId = inlongGroupId;
this.inlongStreamId = inlongStreamId;
this.dispatchTime = dispatchTime;
+ this.enableRetryAfterFailure =
CommonConfigHolder.getInstance().isEnableSendRetryAfterFailure();
+ this.maxRetries =
CommonConfigHolder.getInstance().getMaxRetriesAfterFailure();
}
/**
@@ -80,7 +85,7 @@ public abstract class PackProfile {
/**
* getDispatchTime
*
- * @return
+ * @return the dispatch time
*/
public long getDispatchTime() {
return dispatchTime;
@@ -125,8 +130,8 @@ public abstract class PackProfile {
/**
* isTimeout
*
- * @param createThreshold
- * @return
+ * @param createThreshold the creation threshold
+ * @return whether time out
*/
public boolean isTimeout(long createThreshold) {
return createThreshold >= createTime;
@@ -139,23 +144,22 @@ public abstract class PackProfile {
/**
* fail
- * @return
*/
public abstract void fail(DataProxyErrCode errCode, String errMsg);
/**
* isResend
- * @return
+ * @return whether resend message
*/
public abstract boolean isResend();
/**
* addEvent
*
- * @param event
- * @param maxPackCount
- * @param maxPackSize
- * @return
+ * @param event the event need to add
+ * @param maxPackCount the max package count
+ * @param maxPackSize the max package size
+ * @return whether add success
*/
public abstract boolean addEvent(Event event, long maxPackCount, long
maxPackSize);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
index e701509822..99d6ac63c9 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
@@ -53,10 +53,10 @@ public class SimplePackProfile extends PackProfile {
/**
* Constructor
- * @param uid
- * @param inlongGroupId
- * @param inlongStreamId
- * @param dispatchTime
+ * @param uid the inlong id
+ * @param inlongGroupId the group id
+ * @param inlongStreamId the stream id
+ * @param dispatchTime the received time
*/
public SimplePackProfile(String uid, String inlongGroupId, String
inlongStreamId, long dispatchTime) {
super(uid, inlongGroupId, inlongStreamId, dispatchTime);
@@ -80,7 +80,9 @@ public class SimplePackProfile extends PackProfile {
@Override
public boolean isResend() {
- return !needRspEvent;
+ return !needRspEvent
+ && enableRetryAfterFailure
+ && (maxRetries < 0 || ++retries <= maxRetries);
}
@Override
@@ -101,9 +103,9 @@ public class SimplePackProfile extends PackProfile {
}
/**
- * create
- * @param event
- * @return
+ * create simple pack profile
+ * @param event the event to process
+ * @return the package profile
*/
public static SimplePackProfile create(Event event) {
Map<String, String> headers = event.getHeaders();
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index 86e0fddefc..40a2b4aa7a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -62,12 +62,12 @@ public class KafkaHandler implements MessageQueueHandler {
// kafka producer
private KafkaProducer<String, byte[]> producer;
- private ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
+ private final ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
/**
* init
- * @param config
- * @param sinkContext
+ * @param config the kafka cluster configure
+ * @param sinkContext the sink context
*/
@Override
public void init(CacheClusterConfig config, MessageQueueZoneSinkContext
sinkContext) {
@@ -112,14 +112,14 @@ public class KafkaHandler implements MessageQueueHandler {
}
/**
- * send
- * @param profile
- * @return
+ * send message to mq
+ * @param profile the profile need to send
+ * @return whether sent the profile
*/
@Override
public boolean send(PackProfile profile) {
+ String topic = null;
try {
- String topic;
// get idConfig
IdTopicConfig idConfig =
ConfigManager.getInstance().getIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
@@ -158,7 +158,8 @@ public class KafkaHandler implements MessageQueueHandler {
}
return true;
} catch (Exception ex) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
+
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
+ profile.getUid() + "." + topic);
sinkContext.processSendFail(profile, clusterName,
profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE,
ex.getMessage());
if (logCounter.shouldPrint()) {
@@ -201,7 +202,7 @@ public class KafkaHandler implements MessageQueueHandler {
@Override
public void onCompletion(RecordMetadata arg0, Exception ex) {
if (ex != null) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILURE);
sinkContext.processSendFail(batchProfile, clusterName,
topic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
@@ -245,13 +246,17 @@ public class KafkaHandler implements MessageQueueHandler {
@Override
public void onCompletion(RecordMetadata arg0, Exception ex) {
if (ex != null) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_FAILURE,
topic);
+ sinkContext.fileMetricAddFailCnt(simpleProfile, topic,
+ arg0 == null ? "" :
String.valueOf(arg0.partition()));
sinkContext.processSendFail(simpleProfile, clusterName,
topic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
LOG.error("Send SimplePackProfile to Kafka failure",
ex);
}
} else {
+ sinkContext.fileMetricAddSuccCnt(simpleProfile, topic,
+ arg0 == null ? "" :
String.valueOf(arg0.partition()));
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(simpleProfile,
clusterName, topic, true, sendTime);
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index 7800e615fa..e0a57e5e94 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -95,7 +95,7 @@ public class PulsarHandler implements MessageQueueHandler {
private String tenant;
private String namespace;
- private ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
+ private final ThreadLocal<EventHandler> handlerLocal = new ThreadLocal<>();
/**
* pulsar client
@@ -103,12 +103,12 @@ public class PulsarHandler implements MessageQueueHandler
{
private PulsarClient client;
private ProducerBuilder<byte[]> baseBuilder;
- private ConcurrentHashMap<String, Producer<byte[]>> producerMap = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, Producer<byte[]>> producerMap =
new ConcurrentHashMap<>();
/**
* init
- * @param config
- * @param sinkContext
+ * @param config the Pulsar cluster configure
+ * @param sinkContext the sink context
*/
public void init(CacheClusterConfig config, MessageQueueZoneSinkContext
sinkContext) {
this.config = config;
@@ -198,13 +198,13 @@ public class PulsarHandler implements MessageQueueHandler
{
/**
* send
- * @param profile
- * @return
+ * @param profile the profile to send
+ * @return whether sent the profile
*/
@Override
public boolean send(PackProfile profile) {
+ String producerTopic = null;
try {
- String producerTopic;
// get idConfig
IdTopicConfig idConfig =
ConfigManager.getInstance().getIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
@@ -266,7 +266,8 @@ public class PulsarHandler implements MessageQueueHandler {
}
return true;
} catch (Exception ex) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
+
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
+ profile.getUid() + "." + producerTopic);
sinkContext.processSendFail(profile, clusterName,
profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE,
ex.getMessage());
if (logCounter.shouldPrint()) {
@@ -323,7 +324,7 @@ public class PulsarHandler implements MessageQueueHandler {
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILURE);
sinkContext.processSendFail(batchProfile, clusterName,
producerTopic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
@@ -357,13 +358,15 @@ public class PulsarHandler implements MessageQueueHandler
{
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_FAILURE,
producerTopic);
+ sinkContext.fileMetricAddFailCnt(simpleProfile, producerTopic,
msgId.toString());
sinkContext.processSendFail(simpleProfile, clusterName,
producerTopic, sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
LOG.error("Send SimpleProfileV0 to Pulsar failure", ex);
}
} else {
+ sinkContext.fileMetricAddSuccCnt(simpleProfile, producerTopic,
msgId.toString());
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
sinkContext.addSendResultMetric(simpleProfile, clusterName,
producerTopic, true, sendTime);
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index 0b5ee8cffa..d78e282d78 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -78,8 +78,8 @@ public class TubeHandler implements MessageQueueHandler {
/**
* init
- * @param config
- * @param sinkContext
+ * @param config the cluster configure
+ * @param sinkContext the sink context
*/
@Override
public void init(CacheClusterConfig config, MessageQueueZoneSinkContext
sinkContext) {
@@ -124,7 +124,7 @@ public class TubeHandler implements MessageQueueHandler {
/**
* initTubeConfig
- * @return
+ * @return the client configure
*
* @throws Exception
*/
@@ -182,9 +182,9 @@ public class TubeHandler implements MessageQueueHandler {
* send
*/
public boolean send(PackProfile profile) {
+ String topic = null;
try {
// idConfig
- String topic;
IdTopicConfig idConfig =
ConfigManager.getInstance().getIdTopicConfig(
profile.getInlongGroupId(), profile.getInlongStreamId());
if (idConfig == null) {
@@ -227,11 +227,14 @@ public class TubeHandler implements MessageQueueHandler {
this.sendBatchPackProfile((BatchPackProfile) profile,
idConfig, topic);
}
return true;
- } catch (Exception ex) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SEND_EXCEPTION);
+ } catch (Throwable ex) {
+ sinkContext.fileMetricIncWithDetailStats(
+ StatConstants.EVENT_SINK_SEND_EXCEPTION, profile.getUid()
+ "." + topic);
sinkContext.processSendFail(profile, clusterName,
profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE,
ex.getMessage());
- LOG.error(ex.getMessage(), ex);
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send Message to Tube failure", ex);
+ }
return false;
}
}
@@ -265,10 +268,19 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onMessageSent(MessageSentResult result) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
- sinkContext.addSendResultMetric(batchProfile, clusterName,
topic, true, sendTime);
- sinkContext.getDispatchQueue().release(batchProfile.getSize());
- batchProfile.ack();
+ if (result.isSuccess()) {
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
+ sinkContext.addSendResultMetric(batchProfile, clusterName,
topic, true, sendTime);
+
sinkContext.getDispatchQueue().release(batchProfile.getSize());
+ batchProfile.ack();
+ } else {
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_FAILURE);
+ sinkContext.processSendFail(batchProfile, clusterName,
topic, sendTime,
+ DataProxyErrCode.MQ_RETURN_ERROR,
result.getErrMsg());
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send ProfileV1 to tube failure {}",
result.getErrMsg());
+ }
+ }
}
@Override
@@ -277,7 +289,7 @@ public class TubeHandler implements MessageQueueHandler {
sinkContext.processSendFail(batchProfile, clusterName, topic,
sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
- LOG.error("Send ProfileV1 to tube failure", ex);
+ LOG.error("Send ProfileV1 to tube exception", ex);
}
}
};
@@ -299,19 +311,32 @@ public class TubeHandler implements MessageQueueHandler {
@Override
public void onMessageSent(MessageSentResult result) {
-
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
- sinkContext.addSendResultMetric(simpleProfile, clusterName,
topic, true, sendTime);
-
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
- simpleProfile.ack();
+ if (result.isSuccess()) {
+ sinkContext.fileMetricAddSuccCnt(simpleProfile, topic,
result.getPartition().getHost());
+
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_SUCCESS);
+ sinkContext.addSendResultMetric(simpleProfile,
clusterName, topic, true, sendTime);
+
sinkContext.getDispatchQueue().release(simpleProfile.getSize());
+ simpleProfile.ack();
+ } else {
+
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_FAILURE,
+ topic + "." + result.getErrCode());
+ sinkContext.fileMetricAddFailCnt(simpleProfile, topic,
result.getPartition().getHost());
+ sinkContext.processSendFail(simpleProfile, clusterName,
topic, sendTime,
+ DataProxyErrCode.MQ_RETURN_ERROR,
result.getErrMsg());
+ if (logCounter.shouldPrint()) {
+ LOG.error("Send SimpleProfileV0 to tube failure: {}",
result.getErrMsg());
+ }
+ }
}
@Override
public void onException(Throwable ex) {
sinkContext.fileMetricIncSumStats(StatConstants.EVENT_SINK_RECEIVEEXCEPT);
+ sinkContext.fileMetricAddFailCnt(simpleProfile, topic, "");
sinkContext.processSendFail(simpleProfile, clusterName, topic,
sendTime,
DataProxyErrCode.MQ_RETURN_ERROR, ex.getMessage());
if (logCounter.shouldPrint()) {
- LOG.error("Send SimpleProfileV0 to tube failure", ex);
+ LOG.error("Send SimpleProfileV0 to tube exception", ex);
}
}
};
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
index a4d094872b..1ff018786b 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/InLongMessageHandler.java
@@ -295,7 +295,7 @@ public class InLongMessageHandler extends
ChannelInboundHandlerAdapter {
}
} catch (Throwable ex) {
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_FAILURE);
- source.fileMetricAddFailCnt(statsKey, msgCodec.getMsgCount());
+ source.fileMetricAddFailCnt(statsKey, 1);
source.addMetric(false, event.getBody().length, event);
if (msgCodec.isNeedResp() && !msgCodec.isOrderOrProxy()) {
msgCodec.setFailureInfo(DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
index fd016f4857..2244207f06 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
@@ -332,6 +332,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
.append(AttributeConstants.KEY_VALUE_SEPARATOR).append(msgRcvTime);
inLongMsg.addMsg(strBuff.toString(),
body.getBytes(HttpAttrConst.VAL_DEF_CHARSET));
byte[] inlongMsgData = inLongMsg.buildArray();
+ long pkgTime = inLongMsg.getCreatetime();
inLongMsg.reset();
strBuff.delete(0, strBuff.length());
// build flume event
@@ -344,6 +345,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
eventHeaders.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
eventHeaders.put(ConfigConstants.MSG_ENCODE_VER,
InLongMsgVer.INLONG_V0.getName());
eventHeaders.put(AttributeConstants.RCV_TIME,
String.valueOf(msgRcvTime));
+ eventHeaders.put(ConfigConstants.PKG_TIME_KEY,
DateTimeUtils.ms2yyyyMMddHHmm(pkgTime));
Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
// build metric data item
dataTime = dataTime / 1000 / 60 / 10;
@@ -366,7 +368,7 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
return true;
} catch (Throwable ex) {
source.fileMetricIncSumStats(StatConstants.EVENT_MSG_V0_POST_FAILURE);
- source.fileMetricAddFailCnt(statsKey, intMsgCnt);
+ source.fileMetricAddFailCnt(statsKey, 1);
source.addMetric(false, event.getBody().length, event);
sendErrorMsg(ctx, DataProxyErrCode.PUT_EVENT_TO_CHANNEL_FAILURE,
strBuff.append("Put event to channel failure:
").append(ex.getMessage()).toString());
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
index d5dd70c7a4..9f31e7f97e 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/CodecBinMsg.java
@@ -272,7 +272,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
}
return false;
}
- if (StringUtils.isNotBlank(this.groupId) &&
!this.groupId.equalsIgnoreCase(confGroupId)) {
+ if (StringUtils.isNotBlank(this.groupId) &&
!this.groupId.equals(confGroupId)) {
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_GROUP_IDNUM_INCONSTANT);
this.errCode = DataProxyErrCode.GROUPID_OR_STREAMID_INCONSTANT;
this.errMsg = String.format(
@@ -305,7 +305,7 @@ public class CodecBinMsg extends AbsV0MsgCodec {
}
return false;
}
- if (StringUtils.isNotBlank(this.streamId) &&
!this.streamId.equalsIgnoreCase(confStreamId)) {
+ if (StringUtils.isNotBlank(this.streamId) &&
!this.streamId.equals(confStreamId)) {
source.fileMetricIncSumStats(StatConstants.EVENT_CONFIG_STREAM_IDNUM_INCONSTANT);
this.errCode =
DataProxyErrCode.GROUPID_OR_STREAMID_INCONSTANT;
this.errMsg = String.format(
diff --git
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
index 9f0111e21e..8e02c66b0e 100644
---
a/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
+++
b/inlong-dataproxy/dataproxy-source/src/test/java/org/apache/inlong/dataproxy/config/holder/TestCommonConfigHolder.java
@@ -38,5 +38,8 @@ public class TestCommonConfigHolder {
Assert.assertEquals(10000,
CommonConfigHolder.getInstance().getMetaConfigSyncInvlMs());
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableUnConfigTopicAccept());
Assert.assertTrue(CommonConfigHolder.getInstance().getDefTopics().contains("test2"));
+
Assert.assertTrue(CommonConfigHolder.getInstance().isEnableSendRetryAfterFailure());
+ Assert.assertEquals(2,
CommonConfigHolder.getInstance().getMaxRetriesAfterFailure());
}
+
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
index 4ee6350595..e37ef6cfeb 100644
--- a/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
+++ b/inlong-dataproxy/dataproxy-source/src/test/resources/common.properties
@@ -30,3 +30,7 @@ meta.config.sync.interval.ms=10000
id2topic.unconfigured.accept.enable=true
# the default topic if accept unconfigured topic's message
id2topic.unconfigured.default.topics= test1 test2 test3
+# whether to retry send after sent failure
+send.retry.after.failure = true
+# max retries after sent failure
+max.retries.after.failure=2