This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 221a1ddc5f [INLONG-8284][DataProxy] Unify the message encoding
definition of DataProxy (#8285)
221a1ddc5f is described below
commit 221a1ddc5fabeada93cb5b1339b8d4a6f559921e
Author: Goson Zhang <[email protected]>
AuthorDate: Mon Jun 19 22:14:25 2023 +0800
[INLONG-8284][DataProxy] Unify the message encoding definition of DataProxy
(#8285)
---
.../inlong/common/enums/DataProxyMsgEncType.java | 29 ++++++++++++++--------
.../dataproxy/http/SimpleMessageHandler.java | 8 ++++--
.../inlong/dataproxy/metrics/audit/AuditUtils.java | 4 +--
.../dataproxy/sink/common/DefaultEventHandler.java | 8 +++---
.../inlong/dataproxy/sink/common/TubeUtils.java | 4 +--
.../inlong/dataproxy/sink/mq/BatchPackManager.java | 24 ++++++++++++------
.../dataproxy/sink/mq/MessageQueueZoneSink.java | 8 +++---
.../sink/mq/MessageQueueZoneSinkContext.java | 12 ++++-----
.../dataproxy/sink/mq/SimplePackProfile.java | 19 ++++++++++++++
.../dataproxy/sink/mq/kafka/KafkaHandler.java | 5 ++--
.../dataproxy/sink/mq/pulsar/PulsarHandler.java | 5 ++--
.../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 26 +++++++++++--------
.../dataproxy/source/ServerMessageHandler.java | 8 ++++--
.../dataproxy/source/SimpleMessageHandler.java | 9 ++++---
.../source2/httpMsg/InLongHttpMsgHandler.java | 8 ++++--
.../dataproxy/source2/v0msg/AbsV0MsgCodec.java | 14 +++++------
.../inlong/dataproxy/utils/MessageUtils.java | 3 ++-
.../sdk/commons/protocol/EventConstants.java | 2 ++
.../sdk/sort/impl/decode/MessageDeserializer.java | 29 +++++++++-------------
19 files changed, 137 insertions(+), 88 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
similarity index 58%
rename from
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java
rename to
inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
index 9dcac24f21..1335cfcf6a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/InLongMsgVer.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataProxyMsgEncType.java
@@ -15,14 +15,19 @@
* limitations under the License.
*/
-package org.apache.inlong.dataproxy.utils;
+package org.apache.inlong.common.enums;
-public enum InLongMsgVer {
+/**
+ * Enumeration class of encoding format of data output from DataProxy to MQ
+ */
+public enum DataProxyMsgEncType {
- INLONG_V0(0, "V0", "The inlong-msg V0 format"),
- INLONG_V1(1, "V1", "The inlong-msg V1 format");
+ MSG_ENCODE_TYPE_RAW(0, "Raw", "Raw message without any InLong format"),
+ MSG_ENCODE_TYPE_PB(1, "PB", "The PB MessagePack encode format"),
+ MSG_ENCODE_TYPE_INLONGMSG(2, "InLongMsg", "The InLongMsg encode format"),
+ MSG_ENCODE_TYPE_UNKNOWN(99, "Unknown", "Unknown encode format");
- InLongMsgVer(int id, String name, String desc) {
+ DataProxyMsgEncType(int id, String name, String desc) {
this.id = id;
this.name = name;
this.desc = desc;
@@ -32,6 +37,10 @@ public enum InLongMsgVer {
return id;
}
+ public String getStrId() {
+ return String.valueOf(id);
+ }
+
public String getName() {
return name;
}
@@ -40,13 +49,13 @@ public enum InLongMsgVer {
return desc;
}
- public static InLongMsgVer valueOf(int value) {
- for (InLongMsgVer inLongMsgVer : InLongMsgVer.values()) {
- if (inLongMsgVer.getId() == value) {
- return inLongMsgVer;
+ public static DataProxyMsgEncType valueOf(int value) {
+ for (DataProxyMsgEncType msgEncType : DataProxyMsgEncType.values()) {
+ if (msgEncType.getId() == value) {
+ return msgEncType;
}
}
- return INLONG_V0;
+ return MSG_ENCODE_TYPE_UNKNOWN;
}
private final int id;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index e2eba723a3..2a37012d0d 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.http;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.msg.AttributeConstants;
@@ -30,8 +31,8 @@ import
org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.source.ServiceDecoder;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.apache.inlong.dataproxy.utils.MessageUtils;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
@@ -156,7 +157,10 @@ public class SimpleMessageHandler implements
MessageHandler {
headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
headers.put(ConfigConstants.REMOTE_IDC_KEY, DEFAULT_REMOTE_IDC_VALUE);
headers.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
- headers.put(ConfigConstants.MSG_ENCODE_VER,
InLongMsgVer.INLONG_V0.getName());
+ headers.put(ConfigConstants.MSG_ENCODE_VER,
+ DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+ headers.put(EventConstants.HEADER_KEY_VERSION,
+ DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
byte[] data = inLongMsg.buildArray();
headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
Event event = EventBuilder.withBody(data, headers);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
index 5fb13fb7a3..258bc24255 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/audit/AuditUtils.java
@@ -19,12 +19,12 @@ package org.apache.inlong.dataproxy.metrics.audit;
import org.apache.inlong.audit.AuditOperator;
import org.apache.inlong.audit.util.AuditConfig;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.dataproxy.config.CommonConfigHolder;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Event;
@@ -64,7 +64,7 @@ public class AuditUtils {
}
Map<String, String> headers = event.getHeaders();
String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
- if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
+ if
(DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId().equalsIgnoreCase(pkgVersion))
{
String inlongGroupId =
DataProxyMetricItem.getInlongGroupId(headers);
String inlongStreamId =
DataProxyMetricItem.getInlongStreamId(headers);
long logTime = getLogTime(headers);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
index c077ebd9a8..88430d2eba 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/DefaultEventHandler.java
@@ -17,7 +17,9 @@
package org.apache.inlong.dataproxy.sink.common;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.sdk.commons.protocol.EventConstants;
import org.apache.inlong.sdk.commons.protocol.ProxyEvent;
@@ -35,9 +37,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_CACHE_VERSION_1;
-import static
org.apache.inlong.sdk.commons.protocol.EventConstants.HEADER_KEY_VERSION;
-
/**
* DefaultEventHandler
*
@@ -52,7 +51,8 @@ public class DefaultEventHandler implements EventHandler {
INLONG_COMPRESSED_TYPE compressType) {
Map<String, String> headers = new HashMap<>();
// version int32 protocol version, the value is 1
- headers.put(HEADER_KEY_VERSION, HEADER_CACHE_VERSION_1);
+ headers.put(ConfigConstants.MSG_ENCODE_VER,
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
+ headers.put(EventConstants.HEADER_KEY_VERSION,
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
// inlongGroupId string inlongGroupId
headers.put(EventConstants.INLONG_GROUP_ID,
profile.getInlongGroupId());
// inlongStreamId string inlongStreamId
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
index ca4bcceed4..9bd682429b 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeUtils.java
@@ -17,12 +17,12 @@
package org.apache.inlong.dataproxy.sink.common;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.utils.Constants;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.apache.inlong.dataproxy.utils.MessageUtils;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.corebase.Message;
@@ -62,7 +62,7 @@ public class TubeUtils {
Map<String, String> headers = event.getHeaders();
Message message = new Message(topicName, event.getBody());
String pkgVersion = headers.get(ConfigConstants.MSG_ENCODE_VER);
- if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
+ if
(DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId().equalsIgnoreCase(pkgVersion))
{
long dataTimeL =
Long.parseLong(headers.get(ConfigConstants.PKG_TIME_KEY));
message.putSystemHeader(headers.get(Constants.INLONG_STREAM_ID),
DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
index 9daebf5c90..f1b4f54473 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
@@ -54,6 +54,7 @@ public class BatchPackManager {
private final long dispatchTimeout;
private final long maxPackCount;
private final long maxPackSize;
+ private final String sinkName;
private final BufferQueue<PackProfile> dispatchQueue;
private final ConcurrentHashMap<String, PackProfile> profileCache = new
ConcurrentHashMap<>();
// flag that manager need to output overtime data.
@@ -63,11 +64,13 @@ public class BatchPackManager {
/**
* Constructor
- *
+ *
+ * @param sinkName the sink name
* @param context the process context
* @param dispatchQueue the batch queue
*/
- public BatchPackManager(Context context, BufferQueue<PackProfile>
dispatchQueue) {
+ public BatchPackManager(String sinkName, Context context,
BufferQueue<PackProfile> dispatchQueue) {
+ this.sinkName = sinkName;
this.dispatchQueue = dispatchQueue;
this.dispatchTimeout = context.getLong(KEY_DISPATCH_TIMEOUT,
DEFAULT_DISPATCH_TIMEOUT);
this.maxPackCount = context.getLong(KEY_DISPATCH_MAX_PACKCOUNT,
DEFAULT_DISPATCH_MAX_PACKCOUNT);
@@ -167,8 +170,8 @@ public class BatchPackManager {
if (!needOutputOvertimeData.getAndSet(false)) {
return;
}
- LOG.debug("start to outputOvertimeData
profileCacheSize:{},dispatchQueueSize:{}",
- profileCache.size(), dispatchQueue.size());
+ int profileSize = profileCache.size();
+ int dispatchSize = dispatchQueue.size();
long currentTime = System.currentTimeMillis();
long createThreshold = currentTime - dispatchTimeout;
List<String> removeKeys = new ArrayList<>();
@@ -190,10 +193,15 @@ public class BatchPackManager {
outCounter.addAndGet(dispatchProfile.getCount());
}
});
- LOG.debug("end to outputOvertimeData
profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
- + "inCounter:{},outCounter:{}",
- profileCache.size(), dispatchQueue.size(), eventCount,
- inCounter.getAndSet(0), outCounter.getAndSet(0));
+ long hisInCnt = inCounter.getAndSet(0);
+ long hisOutCnt = outCounter.getAndSet(0);
+ if (!removeKeys.isEmpty()) {
+ LOG.info("{} output overtime data, profileCacheSize: before={},
after={},"
+ + " dispatchQueueSize: before={}, after={}, eventCount:
{},"
+ + " inCounter: {}, outCounter: {}",
+ sinkName, profileSize, profileCache.size(), dispatchSize,
dispatchQueue.size(),
+ eventCount, hisInCnt, hisOutCnt);
+ }
}
/**
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
index 543a454ba6..0a91655ce4 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSink.java
@@ -53,7 +53,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
private Context parentContext;
private MessageQueueZoneSinkContext context;
- private List<MessageQueueZoneWorker> workers = new ArrayList<>();
+ private final List<MessageQueueZoneWorker> workers = new ArrayList<>();
// message group
private BatchPackManager dispatchManager;
private BufferQueue<PackProfile> dispatchQueue;
@@ -74,7 +74,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
/**
* configure
*
- * @param context
+ * @param context the sink context
*/
@Override
public void configure(Context context) {
@@ -96,7 +96,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
LOG.error(getName() + "'s channel is null");
}
this.context.start();
- this.dispatchManager = new BatchPackManager(parentContext,
dispatchQueue);
+ this.dispatchManager = new BatchPackManager(getName(),
parentContext, dispatchQueue);
this.scheduledPool = Executors.newScheduledThreadPool(2);
// dispatch
this.scheduledPool.scheduleWithFixedDelay(new Runnable() {
@@ -157,7 +157,7 @@ public class MessageQueueZoneSink extends AbstractSink
implements Configurable,
/**
* process
*
- * @return Status
+ * @return Status
* @throws EventDeliveryException
*/
@Override
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
index 15f0879ad0..9b60cffa3d 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
@@ -284,12 +284,12 @@ public class MessageQueueZoneSinkContext extends
SinkContext {
if (packProfile instanceof SimplePackProfile) {
SimplePackProfile simpleProfile = (SimplePackProfile) packProfile;
StringBuilder statsKey = new StringBuilder(512)
- .append(sinkName).append(AttrConstants.SEPARATOR)
-
.append(simpleProfile.getInlongGroupId()).append(AttrConstants.SEPARATOR)
-
.append(simpleProfile.getInlongStreamId()).append(AttrConstants.SEPARATOR)
- .append(topic).append(AttrConstants.SEPARATOR)
-
.append(NetworkUtils.getLocalIp()).append(AttrConstants.SEPARATOR)
- .append(remoteId).append(AttrConstants.SEPARATOR)
+ .append(sinkName).append(AttrConstants.SEP_HASHTAG)
+
.append(simpleProfile.getInlongGroupId()).append(AttrConstants.SEP_HASHTAG)
+
.append(simpleProfile.getInlongStreamId()).append(AttrConstants.SEP_HASHTAG)
+ .append(topic).append(AttrConstants.SEP_HASHTAG)
+
.append(NetworkUtils.getLocalIp()).append(AttrConstants.SEP_HASHTAG)
+ .append(remoteId).append(AttrConstants.SEP_HASHTAG)
.append(simpleProfile.getProperties().get(ConfigConstants.PKG_TIME_KEY));
monitorIndex.addSuccStats(statsKey.toString(), NumberUtils.toInt(
simpleProfile.getProperties().get(ConfigConstants.MSG_COUNTER_KEY), 1),
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
index 99d6ac63c9..c816feb14e 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.java
@@ -21,8 +21,12 @@ import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.MsgType;
+import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.base.SinkRspEvent;
+import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.source2.InLongMessageHandler;
+import org.apache.inlong.dataproxy.utils.Constants;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import io.netty.buffer.ByteBuf;
@@ -33,6 +37,7 @@ import org.apache.flume.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -142,6 +147,20 @@ public class SimplePackProfile extends PackProfile {
return event.getHeaders();
}
+ /**
+ * get required properties sent to MQ
+ *
+ * @return the properties
+ */
+ public Map<String, String> getPropsToMQ() {
+ Map<String, String> result = new HashMap<>();
+ result.put(Constants.HEADER_KEY_SOURCE_TIME,
event.getHeaders().get(AttributeConstants.RCV_TIME));
+ result.put(ConfigConstants.MSG_ENCODE_VER,
event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER));
+ result.put(EventConstants.HEADER_KEY_VERSION,
event.getHeaders().get(EventConstants.HEADER_KEY_VERSION));
+ result.put(ConfigConstants.DATAPROXY_IP_KEY,
NetworkUtils.getLocalIp());
+ return result;
+ }
+
/**
* Return response to client in source
*/
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
index 40a2b4aa7a..e91f41f4b4 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java
@@ -158,8 +158,7 @@ public class KafkaHandler implements MessageQueueHandler {
}
return true;
} catch (Exception ex) {
-
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
- profile.getUid() + "." + topic);
+
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
topic);
sinkContext.processSendFail(profile, clusterName,
profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE,
ex.getMessage());
if (logCounter.shouldPrint()) {
@@ -225,7 +224,7 @@ public class KafkaHandler implements MessageQueueHandler {
private void sendSimplePackProfile(SimplePackProfile simpleProfile,
IdTopicConfig idConfig,
String topic) throws Exception {
// headers
- Map<String, String> headers = simpleProfile.getProperties();
+ Map<String, String> headers = simpleProfile.getPropsToMQ();
// body
byte[] bodyBytes = simpleProfile.getEvent().getBody();
// metric
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
index e0a57e5e94..3933045d74 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java
@@ -266,8 +266,7 @@ public class PulsarHandler implements MessageQueueHandler {
}
return true;
} catch (Exception ex) {
-
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
- profile.getUid() + "." + producerTopic);
+
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
producerTopic);
sinkContext.processSendFail(profile, clusterName,
profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE,
ex.getMessage());
if (logCounter.shouldPrint()) {
@@ -346,7 +345,7 @@ public class PulsarHandler implements MessageQueueHandler {
Producer<byte[]> producer,
String producerTopic) throws Exception {
// headers
- Map<String, String> headers = simpleProfile.getProperties();
+ Map<String, String> headers = simpleProfile.getPropsToMQ();
// body
byte[] bodyBytes = simpleProfile.getEvent().getBody();
// metric
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
index d78e282d78..e8054f8863 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java
@@ -26,12 +26,13 @@ import
org.apache.inlong.dataproxy.config.pojo.IdTopicConfig;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.sink.common.EventHandler;
-import org.apache.inlong.dataproxy.sink.common.TubeUtils;
import org.apache.inlong.dataproxy.sink.mq.BatchPackProfile;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueHandler;
import org.apache.inlong.dataproxy.sink.mq.MessageQueueZoneSinkContext;
import org.apache.inlong.dataproxy.sink.mq.PackProfile;
import org.apache.inlong.dataproxy.sink.mq.SimplePackProfile;
+import org.apache.inlong.dataproxy.utils.DateTimeUtils;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.exception.TubeClientException;
import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory;
@@ -228,8 +229,7 @@ public class TubeHandler implements MessageQueueHandler {
}
return true;
} catch (Throwable ex) {
- sinkContext.fileMetricIncWithDetailStats(
- StatConstants.EVENT_SINK_SEND_EXCEPTION, profile.getUid()
+ "." + topic);
+
sinkContext.fileMetricIncWithDetailStats(StatConstants.EVENT_SINK_SEND_EXCEPTION,
topic);
sinkContext.processSendFail(profile, clusterName,
profile.getUid(), 0,
DataProxyErrCode.SEND_REQUEST_TO_MQ_FAILURE,
ex.getMessage());
if (logCounter.shouldPrint()) {
@@ -249,19 +249,18 @@ public class TubeHandler implements MessageQueueHandler {
handler = this.sinkContext.createEventHandler();
handlerLocal.set(handler);
}
- // headers
+ // get headers to mq
Map<String, String> headers = handler.parseHeader(idConfig,
batchProfile, sinkContext.getNodeId(),
sinkContext.getCompressType());
// compress
byte[] bodyBytes = handler.parseBody(idConfig, batchProfile,
sinkContext.getCompressType());
- // metric
- sinkContext.addSendMetric(batchProfile, clusterName, topic,
bodyBytes.length);
- // sendAsync
Message message = new Message(topic, bodyBytes);
// add headers
- headers.forEach((key, value) -> {
- message.setAttrKeyVal(key, value);
- });
+ long dataTimeL =
Long.parseLong(headers.get(EventConstants.HEADER_KEY_PACK_TIME));
+ message.putSystemHeader(batchProfile.getInlongStreamId(),
DateTimeUtils.ms2yyyyMMddHHmm(dataTimeL));
+ headers.forEach(message::setAttrKeyVal);
+ // metric
+ sinkContext.addSendMetric(batchProfile, clusterName, topic,
bodyBytes.length);
// callback
long sendTime = System.currentTimeMillis();
MessageSentCallback callback = new MessageSentCallback() {
@@ -302,7 +301,12 @@ public class TubeHandler implements MessageQueueHandler {
private void sendSimplePackProfile(SimplePackProfile simpleProfile,
IdTopicConfig idConfig,
String topic) throws Exception {
// build message
- Message message = TubeUtils.buildMessage(topic,
simpleProfile.getEvent());
+ Message message = new Message(topic,
simpleProfile.getEvent().getBody());
+ message.putSystemHeader(simpleProfile.getInlongStreamId(),
+
simpleProfile.getProperties().get(ConfigConstants.PKG_TIME_KEY));
+ // add headers
+ Map<String, String> headers = simpleProfile.getPropsToMQ();
+ headers.forEach(message::setAttrKeyVal);
// metric
sinkContext.addSendMetric(simpleProfile, clusterName, topic,
simpleProfile.getEvent().getBody().length);
// callback
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
index 5669e8de08..bfb5a3fa55 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java
@@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.source;
import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.common.monitor.MonitorIndex;
import org.apache.inlong.common.monitor.MonitorIndexExt;
import org.apache.inlong.common.msg.AttributeConstants;
@@ -34,8 +35,8 @@ import
org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
import org.apache.inlong.dataproxy.utils.MessageUtils;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
@@ -473,7 +474,10 @@ public class ServerMessageHandler extends
ChannelInboundHandlerAdapter {
headers.put(ConfigConstants.REMOTE_IDC_KEY,
DEFAULT_REMOTE_IDC_VALUE);
headers.put(ConfigConstants.MSG_COUNTER_KEY,
commonAttrMap.get(AttributeConstants.MESSAGE_COUNT));
- headers.put(ConfigConstants.MSG_ENCODE_VER,
InLongMsgVer.INLONG_V0.getName());
+ headers.put(ConfigConstants.MSG_ENCODE_VER,
+
DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+ headers.put(EventConstants.HEADER_KEY_VERSION,
+
DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
headers.put(AttributeConstants.RCV_TIME,
commonAttrMap.get(AttributeConstants.RCV_TIME));
headers.put(ConfigConstants.DECODER_ATTRS,
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
index 5db66d1d5a..2d0c8ebbd4 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/SimpleMessageHandler.java
@@ -17,6 +17,7 @@
package org.apache.inlong.dataproxy.source;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.common.msg.MsgType;
@@ -30,7 +31,7 @@ import
org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.dataproxy.utils.Constants;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
@@ -426,9 +427,9 @@ public class SimpleMessageHandler extends
ChannelInboundHandlerAdapter {
commonHeaders.get(AttributeConstants.DATA_TIME));
headers.put(Constants.HEADER_KEY_SOURCE_IP,
commonHeaders.get(AttributeConstants.NODE_IP));
- headers.put(ConfigConstants.MSG_ENCODE_VER,
InLongMsgVer.INLONG_V1.getName());
- Event event = EventBuilder.withBody(proxyMessage.getData(), headers);
- return event;
+ headers.put(ConfigConstants.MSG_ENCODE_VER,
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
+ headers.put(EventConstants.HEADER_KEY_VERSION,
DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId());
+ return EventBuilder.withBody(proxyMessage.getData(), headers);
}
private void responsePackage(Map<String, String> commonAttrMap,
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
index 2244207f06..e17b35228b 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/httpMsg/InLongHttpMsgHandler.java
@@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.source2.httpMsg;
import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.InLongMsg;
@@ -29,7 +30,7 @@ import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.source2.BaseSource;
import org.apache.inlong.dataproxy.utils.AddressUtils;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
@@ -343,7 +344,10 @@ public class InLongHttpMsgHandler extends
SimpleChannelInboundHandler<FullHttpRe
eventHeaders.put(AttributeConstants.DATA_TIME,
String.valueOf(dataTime));
eventHeaders.put(ConfigConstants.REMOTE_IP_KEY, clientIp);
eventHeaders.put(ConfigConstants.MSG_COUNTER_KEY, strMsgCount);
- eventHeaders.put(ConfigConstants.MSG_ENCODE_VER,
InLongMsgVer.INLONG_V0.getName());
+ eventHeaders.put(ConfigConstants.MSG_ENCODE_VER,
+ DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+ eventHeaders.put(EventConstants.HEADER_KEY_VERSION,
+ DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
eventHeaders.put(AttributeConstants.RCV_TIME,
String.valueOf(msgRcvTime));
eventHeaders.put(ConfigConstants.PKG_TIME_KEY,
DateTimeUtils.ms2yyyyMMddHHmm(pkgTime));
Event event = EventBuilder.withBody(inlongMsgData, eventHeaders);
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
index 69fee83565..83d82da94e 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source2/v0msg/AbsV0MsgCodec.java
@@ -18,12 +18,13 @@
package org.apache.inlong.dataproxy.source2.v0msg;
import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.StatConstants;
import org.apache.inlong.dataproxy.source2.BaseSource;
import org.apache.inlong.dataproxy.utils.DateTimeUtils;
-import org.apache.inlong.dataproxy.utils.InLongMsgVer;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
@@ -222,15 +223,14 @@ public abstract class AbsV0MsgCodec {
headers.put(AttributeConstants.DATA_TIME, String.valueOf(dataTimeMs));
headers.put(ConfigConstants.REMOTE_IP_KEY, strRemoteIP);
headers.put(ConfigConstants.MSG_COUNTER_KEY, String.valueOf(msgCount));
- headers.put(ConfigConstants.MSG_ENCODE_VER,
InLongMsgVer.INLONG_V0.getName());
+ headers.put(ConfigConstants.MSG_ENCODE_VER,
+ DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
+ headers.put(EventConstants.HEADER_KEY_VERSION,
+ DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getStrId());
headers.put(AttributeConstants.RCV_TIME, String.valueOf(msgRcvTime));
headers.put(AttributeConstants.UNIQ_ID, String.valueOf(uniq));
+ headers.put(ConfigConstants.PKG_TIME_KEY,
DateTimeUtils.ms2yyyyMMddHHmm(pkgTime));
// add extra key-value information
- String pkgTimeStr = attrMap.get(ConfigConstants.PKG_TIME_KEY);
- if (StringUtils.isBlank(pkgTimeStr)) {
- pkgTimeStr = DateTimeUtils.ms2yyyyMMddHHmm(pkgTime);
- }
- headers.put(ConfigConstants.PKG_TIME_KEY, pkgTimeStr);
if (!needResp) {
headers.put(AttributeConstants.MESSAGE_IS_ACK, "false");
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
index d72a8fe542..7b8c3f305a 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/utils/MessageUtils.java
@@ -18,6 +18,7 @@
package org.apache.inlong.dataproxy.utils;
import org.apache.inlong.common.enums.DataProxyErrCode;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.msg.MsgType;
@@ -387,7 +388,7 @@ public class MessageUtils {
// common attributes
Map<String, String> attrs = new HashMap<>();
attrs.put(ConfigConstants.MSG_ENCODE_VER, pkgVersion);
- if (InLongMsgVer.INLONG_V1.getName().equalsIgnoreCase(pkgVersion)) {
+ if
(DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getStrId().equalsIgnoreCase(pkgVersion))
{
attrs.put("dataproxyip", NetworkUtils.getLocalIp());
attrs.put(Constants.INLONG_GROUP_ID,
headers.get(Constants.INLONG_GROUP_ID));
attrs.put(Constants.INLONG_STREAM_ID,
headers.get(Constants.INLONG_STREAM_ID));
diff --git
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
index ba3221b874..cad0a4895d 100644
---
a/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
+++
b/inlong-sdk/sdk-common/src/main/java/org/apache/inlong/sdk/commons/protocol/EventConstants.java
@@ -23,8 +23,10 @@ package org.apache.inlong.sdk.commons.protocol;
*/
public interface EventConstants {
+ @Deprecated // replace by ConfigConstants.MSG_ENCODE_VER
String HEADER_KEY_VERSION = "version";
String HEADER_SDK_VERSION_1 = "1";
+ @Deprecated // replace by DataProxyMsgEncType.MSG_ENCODE_VER_PB
String HEADER_CACHE_VERSION_1 = "1";
// sdk
String INLONG_GROUP_ID = "inlongGroupId";
diff --git
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
index e2f07ea374..99894456ca 100644
---
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
+++
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/decode/MessageDeserializer.java
@@ -17,7 +17,9 @@
package org.apache.inlong.sdk.sort.impl.decode;
+import org.apache.inlong.common.enums.DataProxyMsgEncType;
import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sdk.commons.protocol.EventConstants;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MapFieldEntry;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
@@ -40,13 +42,9 @@ import java.util.Optional;
public class MessageDeserializer implements Deserializer {
- private static final int MESSAGE_VERSION_NONE = 0;
- private static final int MESSAGE_VERSION_PB = 1;
- private static final int MESSAGE_VERSION_INLONG_MSG = 2;
private static final int COMPRESS_TYPE_NONE = 0;
private static final int COMPRESS_TYPE_GZIP = 1;
private static final int COMPRESS_TYPE_SNAPPY = 2;
- private static final String VERSION_KEY = "version";
private static final String COMPRESS_TYPE_KEY = "compressType";
private static final String MSG_TIME_KEY = "msgTime";
private static final String SOURCE_IP_KEY = "sourceIp";
@@ -75,19 +73,16 @@ public class MessageDeserializer implements Deserializer {
byte[] data) throws Exception {
// 1. version
- int version = Integer.parseInt(headers.getOrDefault(VERSION_KEY,
Integer.toString(MESSAGE_VERSION_INLONG_MSG)));
- switch (version) {
- case MESSAGE_VERSION_NONE: {
- return decode(context, inLongTopic, data, headers);
- }
- case MESSAGE_VERSION_PB: {
- return decodePB(context, inLongTopic, data, headers);
- }
- case MESSAGE_VERSION_INLONG_MSG: {
- return decodeInlongMsg(context, inLongTopic, data, headers);
- }
- default:
- throw new IllegalArgumentException("Unknown version type:" +
version);
+ int version =
Integer.parseInt(headers.getOrDefault(EventConstants.HEADER_KEY_VERSION,
+
Integer.toString(DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId())));
+ if (version == DataProxyMsgEncType.MSG_ENCODE_TYPE_RAW.getId()) {
+ return decode(context, inLongTopic, data, headers);
+ } else if (version == DataProxyMsgEncType.MSG_ENCODE_TYPE_PB.getId()) {
+ return decodePB(context, inLongTopic, data, headers);
+ } else if (version ==
DataProxyMsgEncType.MSG_ENCODE_TYPE_INLONGMSG.getId()) {
+ return decodeInlongMsg(context, inLongTopic, data, headers);
+ } else {
+ throw new IllegalArgumentException("Unknown version type:" +
version);
}
}